当前位置:首页 » 文件管理 » hdfs上传文件

hdfs上传文件

发布时间: 2022-01-10 02:22:39

㈠ 如何实现让用户在网页中上传下载文件到HDFS中

hadoop计算需要在hdfs文件系统上进行,文件上传到hdfs上通常有三种方法:a hadoop自带的dfs服务,put;b hadoop的API,Writer对象可以实现这一功能;c 调用OTL可执行程序,数据从数据库直接进入hadoop

hadoop计算需要在hdfs文件系统上进行,因此每次计算之前必须把需要用到的文件(我们称为原始文件)都上传到hdfs上。文件上传到hdfs上通常有三种方法:

a hadoop自带的dfs服务,put;

b hadoop的API,Writer对象可以实现这一功能;

c 调用OTL可执行程序,数据从数据库直接进入hadoop

由于存在ETL层,因此第三种方案不予考虑

将a、b方案进行对比,如下:

1 空间:方案a在hdfs上占用空间同本地,因此假设只上传日志文件,则保存一个月日志文件将消耗掉约10T空间,如果加上这期间的各种维表、事实表,将占用大约25T空间

方案b经测试,压缩比大约为3~4:1,因此假设hdfs空间为100T,原来只能保存约4个月的数据,现在可以保存约1年

2 上传时间:方案a的上传时间经测试,200G数据上传约1小时

方案b的上传时间,程序不做任何优化,大约是以上的4~6倍,但存在一定程度提升速度的余地

3 运算时间:经过对200G数据,大约4亿条记录的测试,如果程序以IO操作为主,则压缩数据的计算可以提高大约50%的速度,但如果程序以内存操作为主,则只能提高5%~10%的速度

4 其它:未压缩的数据还有一个好处是可以直接在hdfs上查看原始数据。压缩数据想看原始数据只能用程序把它导到本地,或者利用本地备份数据

压缩格式:按照hadoop api的介绍,压缩格式分两种:BLOCK和RECORD,其中RECORD是只对value进行压缩,一般采用BLOCK进行压缩。

对压缩文件进行计算,需要用SequenceFileInputFormat类来读入压缩文件,以下是计算程序的典型配置代码:

JobConf conf = new JobConf(getConf(), log.class);
conf.setJobName(”log”);
conf.setOutputKeyClass(Text.class);//set the map output key type
conf.setOutputValueClass(Text.class);//set the map output value type

conf.setMapperClass(MapClass.class);
//conf.setCombinerClass(Rece.class);//set the combiner class ,if havenot, use Recuce class for default
conf.setRecerClass(Rece.class);
conf.setInputFormat(SequenceFileInputFormat.class);//necessary if use compress

接下来的处理与非压缩格式的处理一样

㈡ hadoop上传文件效率特别慢,怎么解决

基于任何平台实现的云盘系统,面临的首要的技术问题就是客户端上传和下载效率优化问题。基于Hadoop实现的云盘系统,受到Hadoop文件读写机制的影响,采用Hadoop提供的API进行HDFS文件系统访问,文件读取时默认是顺序、逐block读取;写入时是顺序写入。

一、读写机制

首先来看文件读取机制:尽管DataNode实现了文件存储空间的水平扩展和多副本机制,但是针对单个具体文件的读取,Hadoop默认的API接口并没有提供多DataNode的并行读取机制。基于Hadoop提供的API接口实现的云盘客户端也自然面临同样的问题。Hadoop的文件读取流程如下图所示:

使用HDFS提供的客户端开发库,向远程的Namenode发起RPC请求;

Namenode会检查要创建的文件是否已经存在,创建者是否有权限进行操作,成功则会为文件创建一个记录,否则会让客户端抛出异常;

当客户端开始写入文件的时候,开发库会将文件切分成多个packets,并在内部以"data queue"的形式管理这些packets,并向Namenode申请新的blocks,获取用来存储replicas的合适的datanodes列表, 列表的大小根据在Namenode中对replication的设置而定。开始以pipeline(管道)的形式将packet写入所有的replicas中。开发库把packet以流的方式写入第一个 datanode,该datanode把该packet存储之后,再将其传递给在此pipeline中的下一个datanode,直到最后一个 datanode,这种写数据的方式呈流水线的形式。

最后一个datanode成功存储之后会返回一个ack packet,在pipeline里传递至客户端,在客户端的开发库内部维护着"ack queue",成功收到datanode返回的ack packet后会从"ack queue"移除相应的packet。

如果传输过程中,有某个datanode出现了故障,那么当前的pipeline会被关闭,出现故障的datanode会从当前的 pipeline中移除,剩余的block会继续剩下的datanode中继续以pipeline的形式传输,同时Namenode会分配一个新的 datanode,保持replicas设定的数量。

关键词:开发库把packet以流的方式写入第一个datanode,该datanode将其传递给pipeline中的下一个datanode,知道最后一个Datanode,这种写数据的方式呈流水线方式。

二、解决方案

1.下载效率优化

通过以上读写机制的分析,我们可以发现基于Hadoop实现的云盘客户段下载效率的优化可以从两个层级着手:

1.文件整体层面:采用并行访问多线程(多进程)份多文件并行读取。

2.Block块读取:改写Hadoop接口扩展,多Block并行读取。

2.上传效率优化

上传效率优化只能采用文件整体层面的并行处理,不支持分Block机制的多Block并行读取。

HDFS处理大量小文件时的问题

小文件指的是那些size比HDFS 的block size(默认64M)小的多的文件。如果在HDFS中存储小文件,那么在HDFS中肯定会含有许许多多这样的小文件(不然就不会用hadoop了)。
而HDFS的问题在于无法很有效的处理大量小文件。

任何一个文件,目录和block,在HDFS中都会被表示为一个object存储在namenode的内存中,没一个object占用150 bytes的内存空间。所以,如果有10million个文件,
没一个文件对应一个block,那么就将要消耗namenode 3G的内存来保存这些block的信息。如果规模再大一些,那么将会超出现阶段计算机硬件所能满足的极限。

不仅如此,HDFS并不是为了有效的处理大量小文件而存在的。它主要是为了流式的访问大文件而设计的。对小文件的读取通常会造成大量从
datanode到datanode的seeks和hopping来retrieve文件,而这样是非常的低效的一种访问方式。

大量小文件在maprece中的问题

Map tasks通常是每次处理一个block的input(默认使用FileInputFormat)。如果文件非常的小,并且拥有大量的这种小文件,那么每一个map task都仅仅处理了非常小的input数据,
并且会产生大量的map tasks,每一个map task都会消耗一定量的bookkeeping的资源。比较一个1GB的文件,默认block size为64M,和1Gb的文件,没一个文件100KB,
那么后者没一个小文件使用一个map task,那么job的时间将会十倍甚至百倍慢于前者。

hadoop中有一些特性可以用来减轻这种问题:可以在一个JVM中允许task reuse,以支持在一个JVM中运行多个map task,以此来减少一些JVM的启动消耗
(通过设置mapred.job.reuse.jvm.num.tasks属性,默认为1,-1为无限制)。另一种方法为使用MultiFileInputSplit,它可以使得一个map中能够处理多个split。

为什么会产生大量的小文件?

至少有两种情况下会产生大量的小文件

1. 这些小文件都是一个大的逻辑文件的pieces。由于HDFS仅仅在不久前才刚刚支持对文件的append,因此以前用来向unbounde files(例如log文件)添加内容的方式都是通过将这些数据用许多chunks的方式写入HDFS中。
2. 文件本身就是很小。例如许许多多的小图片文件。每一个图片都是一个独立的文件。并且没有一种很有效的方法来将这些文件合并为一个大的文件

这两种情况需要有不同的解决方 式。对于第一种情况,文件是由许许多多的records组成的,那么可以通过件邪行的调用HDFS的sync()方法(和append方法结合使用)来解 决。或者,可以通过些一个程序来专门合并这些小文件(see Nathan Marz’s post about a tool called the Consolidator which does exactly this).

对于第二种情况,就需要某种形式的容器来通过某种方式来group这些file。hadoop提供了一些选择:

* HAR files

Hadoop Archives (HAR files)是在0.18.0版本中引入的,它的出现就是为了缓解大量小文件消耗namenode内存的问题。HAR文件是通过在HDFS上构建一个层次化的文件系统来工作。一个HAR文件是通过hadoop的archive命令来创建,而这个命令实 际上也是运行了一个MapRece任务来将小文件打包成HAR。对于client端来说,使用HAR文件没有任何影响。所有的原始文件都 visible && accessible(using har://URL)。但在HDFS端它内部的文件数减少了。

通 过HAR来读取一个文件并不会比直接从HDFS中读取文件高效,而且实际上可能还会稍微低效一点,因为对每一个HAR文件的访问都需要完成两层index 文件的读取和文件本身数据的读取(见上图)。并且尽管HAR文件可以被用来作为MapRece job的input,但是并没有特殊的方法来使maps将HAR文件中打包的文件当作一个HDFS文件处理。 可以考虑通过创建一种input format,利用HAR文件的优势来提高MapRece的效率,但是目前还没有人作这种input format。 需要注意的是:MultiFileInputSplit,即使在HADOOP-4565的改进(choose files in a split that are node local),但始终还是需要seek per small file。

* Sequence Files

通 常对于“the small files problem”的回应会是:使用SequenceFile。这种方法是说,使用filename作为key,并且file contents作为value。实践中这种方式非常管用。回到10000个100KB的文件,可以写一个程序来将这些小文件写入到一个单独的 SequenceFile中去,然后就可以在一个streaming fashion(directly or using maprece)中来使用这个sequenceFile。不仅如此,SequenceFiles也是splittable的,所以maprece 可以break them into chunks,并且分别的被独立的处理。和HAR不同的是,这种方式还支持压缩。block的压缩在许多情况下都是最好的选择,因为它将多个 records压缩到一起,而不是一个record一个压缩。

将已有的许多小文件转换成一个SequenceFiles可能会比较慢。但 是,完全有可能通过并行的方式来创建一个一系列的SequenceFiles。(Stuart Sierra has written a very useful post about converting a tar file into a SequenceFile — tools like this are very useful).更进一步,如果有可能最好设计自己的数据pipeline来将数据直接写入一个SequenceFile。

阅读全文

㈢ 关于用java写程序把本地文件上传到HDFS中的问题

将这FileSystem hdfs = FileSystem.get(config);
改成FileSystem hdfs = FileSystem.get(URI.create("hdfs://master:9000"),config)
上面那句取得的是本地文件系统对象,改成下面这个才是取得hdfs文件系统对象,当你要操作本地文件对象的时候就要用上面那句取得本地文件对象,我在2.7.4刚开始也是跟你一样的错误,改为下面的就可以了

㈣ hadoop 本地文件上传到 hdfs 失败

看着像是datanode出问题了,你去master的50070端口看看datanode是否全部正常启动。

㈤ java调用hadoop api向hdfs上传文件问题

var baseText3=null
function srsd(){
var popUp3=document.getElementById("popupcontent3");
popUp3.style.top="";
popUp3.style.left="";
if (baseText3==null){
baseText3=popUp3.innerHTML;
popUp3.innerHTML=baseText3+"<div id=\"statusbar3\"><a onclick=\"hidePopup3();\">
</a></div>";
}

㈥ hdfs 上传文件 ioutils 和fromlocalfile 有什么区别

ystem.FromLocalFile(boolean delSrc, boolean overwrite, Path[] srcs, Path dst)等通过FileSystem操作文件的所以就追踪了一下FileSystem.FromLocalFile的执行过程。

[java] view plain
<span style="font-size:18px;"> public void FromLocalFile(boolean delSrc, boolean overwrite,
Path src, Path dst)
throws IOException {
Configuration conf = getConf();
FileUtil.(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
}</span>

//调用了 FileUtil.(),进到这个()里:

[java] view plain
<span style="font-size:18px;"> public static boolean (FileSystem srcFS, Path src, FileSystem dstFS, Path dst, boolean deleteSource,
boolean overwrite, Configuration conf) throws IOException {
FileStatus fileStatus = srcFS.getFileSt

㈦ 用java向hdfs上传文件时,如何实现断点续传

@Component("javaLargeFileUploaderServlet")
@WebServlet(name = "javaLargeFileUploaderServlet", urlPatterns = { "/javaLargeFileUploaderServlet" })
public class UploadServlet extends HttpRequestHandlerServlet
implements HttpRequestHandler {

private static final Logger log = LoggerFactory.getLogger(UploadServlet.class);

@Autowired
UploadProcessor uploadProcessor;

@Autowired
FileUploaderHelper fileUploaderHelper;

@Autowired
ExceptionCodeMappingHelper exceptionCodeMappingHelper;

@Autowired
Authorizer authorizer;

@Autowired
StaticStateIdentifierManager staticStateIdentifierManager;

@Override
public void handleRequest(HttpServletRequest request, HttpServletResponse response)
throws IOException {
log.trace("Handling request");

Serializable jsonObject = null;
try {
// extract the action from the request
UploadServletAction actionByParameterName =
UploadServletAction.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.action));

// check authorization
checkAuthorization(request, actionByParameterName);

// then process the asked action
jsonObject = processAction(actionByParameterName, request);

// if something has to be written to the response
if (jsonObject != null) {
fileUploaderHelper.writeToResponse(jsonObject, response);
}

}
// If exception, write it
catch (Exception e) {
exceptionCodeMappingHelper.processException(e, response);
}

}

private void checkAuthorization(HttpServletRequest request, UploadServletAction actionByParameterName)
throws MissingParameterException, AuthorizationException {

// check authorization
// if its not get progress (because we do not really care about authorization for get
// progress and it uses an array of file ids)
if (!actionByParameterName.equals(UploadServletAction.getProgress)) {

// extract uuid
final String fileIdFieldValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId, false);

// if this is init, the identifier is the one in parameter
UUID clientOrJobId;
String parameter = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);
if (actionByParameterName.equals(UploadServletAction.getConfig) && parameter != null) {
clientOrJobId = UUID.fromString(parameter);
}
// if not, get it from manager
else {
clientOrJobId = staticStateIdentifierManager.getIdentifier();
}

// call authorizer
authorizer.getAuthorization(
request,
actionByParameterName,
clientOrJobId,
fileIdFieldValue != null ? getFileIdsFromString(fileIdFieldValue).toArray(new UUID[] {}) : null);

}
}

private Serializable processAction(UploadServletAction actionByParameterName, HttpServletRequest request)
throws Exception {
log.debug("Processing action " + actionByParameterName.name());

Serializable returnObject = null;
switch (actionByParameterName) {
case getConfig:
String parameterValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);
returnObject =
uploadProcessor.getConfig(
parameterValue != null ? UUID.fromString(parameterValue) : null);
break;
case verifyCrcOfUncheckedPart:
returnObject = verifyCrcOfUncheckedPart(request);
break;
case prepareUpload:
returnObject = prepareUpload(request);
break;
case clearFile:
uploadProcessor.clearFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));
break;
case clearAll:
uploadProcessor.clearAll();
break;
case pauseFile:
List<UUID> uuids = getFileIdsFromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));
uploadProcessor.pauseFile(uuids);
break;
case resumeFile:
returnObject =
uploadProcessor.resumeFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));
break;
case setRate:
uploadProcessor.setUploadRate(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)),
Long.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.rate)));
break;
case getProgress:
returnObject = getProgress(request);
break;
}
return returnObject;
}

List<UUID> getFileIdsFromString(String fileIds) {
String[] splittedFileIds = fileIds.split(",");
List<UUID> uuids = Lists.newArrayList();
for (int i = 0; i < splittedFileIds.length; i++) {
uuids.add(UUID.fromString(splittedFileIds[i]));
}
return uuids;
}

private Serializable getProgress(HttpServletRequest request)
throws MissingParameterException {
Serializable returnObject;
String[] ids =
new Gson()
.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId), String[].class);
Collection<UUID> uuids = Collections2.transform(Arrays.asList(ids), new Function<String, UUID>() {

@Override
public UUID apply(String input) {
return UUID.fromString(input);
}

});
returnObject = Maps.newHashMap();
for (UUID fileId : uuids) {
try {
ProgressJson progress = uploadProcessor.getProgress(fileId);
((HashMap<String, ProgressJson>) returnObject).put(fileId.toString(), progress);
}
catch (FileNotFoundException e) {
log.debug("No progress will be retrieved for " + fileId + " because " + e.getMessage());
}
}
return returnObject;
}

private Serializable prepareUpload(HttpServletRequest request)
throws MissingParameterException, IOException {

// extract file information
PrepareUploadJson[] fromJson =
new Gson()
.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.newFiles), PrepareUploadJson[].class);

// prepare them
final HashMap<String, UUID> prepareUpload = uploadProcessor.prepareUpload(fromJson);

// return them
return Maps.newHashMap(Maps.transformValues(prepareUpload, new Function<UUID, String>() {

public String apply(UUID input) {
return input.toString();
};
}));
}

private Boolean verifyCrcOfUncheckedPart(HttpServletRequest request)
throws IOException, MissingParameterException, FileCorruptedException, FileStillProcessingException {
UUID fileId = UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));
try {
uploadProcessor.verifyCrcOfUncheckedPart(fileId,
fileUploaderHelper.getParameterValue(request, UploadServletParameter.crc));
}
catch (InvalidCrcException e) {
// no need to log this exception, a fallback behaviour is defined in the
// throwing method.
// but we need to return something!
return Boolean.FALSE;
}
return Boolean.TRUE;
}
}

㈧ 关于hadoop 本地文件向HDFS上传的问题

如何上传的?api么?你写两个不同的文件试试
先确定问题在哪,如果有错误日志贴出来

㈨ eclipse 向hdfs 上传文件为空怎么解决

eclipse 向hdfs 上传文件为空怎么解决
首先,代码稍微改一下【】---->【】其次,你这种做法是无法上传文件的,只是将form中的所有数据写到文件中。必须要能判断【request.getInputStream();】流中,哪些是文件流,哪些是文本域的流或者其他信息的判断。我这边帮你写了一个,用到了一个组件【】two.jsp:MyJSP'UpLoadProcess.jsp'startingpageitems=upload.parseRequest(request);Iteratorit=items.iterator();System.out.println(it.hasNext());while(it.hasNext()){FileItemitem=(FileItem)it.next();if(!item.isFormField()){FilefullFile=newFile(item.getName());FilesavedFile=newFile(uploadPath,fullFile.getName());item.write(savedFile);}}}%>

㈩ 如何远程上传文件到hadoop中

全用以下命令上传文件到Hadoop上:

hadoopfs-putlocal_file_name/user/hadoop/

其中,/user/hadoop/为HDFS上的路径。local_file_name为需要上传的文件名。

热点内容
猫追蝴蝶编程 发布:2025-01-09 14:30:05 浏览:354
花生视频脚本 发布:2025-01-09 13:43:38 浏览:839
锁相环c语言 发布:2025-01-09 13:43:37 浏览:969
e语言盗号源码 发布:2025-01-09 13:35:25 浏览:830
宋plus副驾驶屏什么配置才有 发布:2025-01-09 13:26:38 浏览:908
他有毒缓存 发布:2025-01-09 13:04:35 浏览:264
文件夹太大怎么删 发布:2025-01-09 12:52:17 浏览:98
为什么安卓不像苹果用假后台 发布:2025-01-09 12:30:42 浏览:651
linux算法 发布:2025-01-09 12:27:51 浏览:188
2048游戏c语言代码 发布:2025-01-09 12:14:53 浏览:605