当前位置:首页 » 文件管理 » java上传文件到hdfs

java上传文件到hdfs

发布时间: 2023-07-19 09:25:48

‘壹’ 刚学习spark,想上传文件给hdfs,是不是需要hadoop然后java编程这样是用eclip

spark会把hdfs当做一个数据源来处理, 所以数据存储都要做, 之后编程是从Hadoop改成spark就可以了. 是否用eclipse无所谓, 只要能编译运行就可以

‘贰’ 用java向hdfs上传文件时,如何实现断点续传

@Component("javaLargeFileUploaderServlet")
@WebServlet(name = "javaLargeFileUploaderServlet", urlPatterns = { "/javaLargeFileUploaderServlet" })
public class UploadServlet extends HttpRequestHandlerServlet
implements HttpRequestHandler {

private static final Logger log = LoggerFactory.getLogger(UploadServlet.class);

@Autowired
UploadProcessor uploadProcessor;

@Autowired
FileUploaderHelper fileUploaderHelper;

@Autowired
ExceptionCodeMappingHelper exceptionCodeMappingHelper;

@Autowired
Authorizer authorizer;

@Autowired
StaticStateIdentifierManager staticStateIdentifierManager;

@Override
public void handleRequest(HttpServletRequest request, HttpServletResponse response)
throws IOException {
log.trace("Handling request");

Serializable jsonObject = null;
try {
// extract the action from the request
UploadServletAction actionByParameterName =
UploadServletAction.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.action));

// check authorization
checkAuthorization(request, actionByParameterName);

// then process the asked action
jsonObject = processAction(actionByParameterName, request);

// if something has to be written to the response
if (jsonObject != null) {
fileUploaderHelper.writeToResponse(jsonObject, response);
}

}
// If exception, write it
catch (Exception e) {
exceptionCodeMappingHelper.processException(e, response);
}

}

private void checkAuthorization(HttpServletRequest request, UploadServletAction actionByParameterName)
throws MissingParameterException, AuthorizationException {

// check authorization
// if its not get progress (because we do not really care about authorization for get
// progress and it uses an array of file ids)
if (!actionByParameterName.equals(UploadServletAction.getProgress)) {

// extract uuid
final String fileIdFieldValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId, false);

// if this is init, the identifier is the one in parameter
UUID clientOrJobId;
String parameter = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);
if (actionByParameterName.equals(UploadServletAction.getConfig) && parameter != null) {
clientOrJobId = UUID.fromString(parameter);
}
// if not, get it from manager
else {
clientOrJobId = staticStateIdentifierManager.getIdentifier();
}

// call authorizer
authorizer.getAuthorization(
request,
actionByParameterName,
clientOrJobId,
fileIdFieldValue != null ? getFileIdsFromString(fileIdFieldValue).toArray(new UUID[] {}) : null);

}
}

private Serializable processAction(UploadServletAction actionByParameterName, HttpServletRequest request)
throws Exception {
log.debug("Processing action " + actionByParameterName.name());

Serializable returnObject = null;
switch (actionByParameterName) {
case getConfig:
String parameterValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);
returnObject =
uploadProcessor.getConfig(
parameterValue != null ? UUID.fromString(parameterValue) : null);
break;
case verifyCrcOfUncheckedPart:
returnObject = verifyCrcOfUncheckedPart(request);
break;
case prepareUpload:
returnObject = prepareUpload(request);
break;
case clearFile:
uploadProcessor.clearFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));
break;
case clearAll:
uploadProcessor.clearAll();
break;
case pauseFile:
List<UUID> uuids = getFileIdsFromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));
uploadProcessor.pauseFile(uuids);
break;
case resumeFile:
returnObject =
uploadProcessor.resumeFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));
break;
case setRate:
uploadProcessor.setUploadRate(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)),
Long.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.rate)));
break;
case getProgress:
returnObject = getProgress(request);
break;
}
return returnObject;
}

List<UUID> getFileIdsFromString(String fileIds) {
String[] splittedFileIds = fileIds.split(",");
List<UUID> uuids = Lists.newArrayList();
for (int i = 0; i < splittedFileIds.length; i++) {
uuids.add(UUID.fromString(splittedFileIds[i]));
}
return uuids;
}

private Serializable getProgress(HttpServletRequest request)
throws MissingParameterException {
Serializable returnObject;
String[] ids =
new Gson()
.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId), String[].class);
Collection<UUID> uuids = Collections2.transform(Arrays.asList(ids), new Function<String, UUID>() {

@Override
public UUID apply(String input) {
return UUID.fromString(input);
}

});
returnObject = Maps.newHashMap();
for (UUID fileId : uuids) {
try {
ProgressJson progress = uploadProcessor.getProgress(fileId);
((HashMap<String, ProgressJson>) returnObject).put(fileId.toString(), progress);
}
catch (FileNotFoundException e) {
log.debug("No progress will be retrieved for " + fileId + " because " + e.getMessage());
}
}
return returnObject;
}

private Serializable prepareUpload(HttpServletRequest request)
throws MissingParameterException, IOException {

// extract file information
PrepareUploadJson[] fromJson =
new Gson()
.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.newFiles), PrepareUploadJson[].class);

// prepare them
final HashMap<String, UUID> prepareUpload = uploadProcessor.prepareUpload(fromJson);

// return them
return Maps.newHashMap(Maps.transformValues(prepareUpload, new Function<UUID, String>() {

public String apply(UUID input) {
return input.toString();
};
}));
}

private Boolean verifyCrcOfUncheckedPart(HttpServletRequest request)
throws IOException, MissingParameterException, FileCorruptedException, FileStillProcessingException {
UUID fileId = UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));
try {
uploadProcessor.verifyCrcOfUncheckedPart(fileId,
fileUploaderHelper.getParameterValue(request, UploadServletParameter.crc));
}
catch (InvalidCrcException e) {
// no need to log this exception, a fallback behaviour is defined in the
// throwing method.
// but we need to return something!
return Boolean.FALSE;
}
return Boolean.TRUE;
}
}

‘叁’ 如何实现让用户在网页中上传下载文件到HDFS中

hadoop计算需要在hdfs文件系统上进行,文件上传到hdfs上通常有三种方法:a hadoop自带的dfs服务,put;b hadoop的API,Writer对象可以实现这一功能;c 调用OTL可执行程序,数据从数据库直接进入hadoop

hadoop计算需要在hdfs文件系统上进行,因此每次计算之前必须把需要用到的文件(我们称为原始文件)都上传到hdfs上。文件上传到hdfs上通常有三种方法:

a hadoop自带的dfs服务,put;

b hadoop的API,Writer对象可以实现这一功能;

c 调用OTL可执行程序,数据从数据库直接进入hadoop

由于存在ETL层,因此第三种方案不予考虑

将a、b方案进行对比,如下:

1 空间:方案a在hdfs上占用空间同本地,因此假设只上传日志文件,则保存一个月日志文件将消耗掉约10T空间,如果加上这期间的各种维表、事实表,将占用大约25T空间

方案b经测试,压缩比大约为3~4:1,因此假设hdfs空间为100T,原来只能保存约4个月的数据,现在可以保存约1年

2 上传时间:方案a的上传时间经测试,200G数据上传约1小时

方案b的上传时间,程序不做任何优化,大约是以上的4~6倍,但存在一定程度提升速度的余地

3 运算时间:经过对200G数据,大约4亿条记录的测试,如果程序以IO操作为主,则压缩数据的计算可以提高大约50%的速度,但如果程序以内存操作为主,则只能提高5%~10%的速度

4 其它:未压缩的数据还有一个好处是可以直接在hdfs上查看原始数据。压缩数据想看原始数据只能用程序把它导到本地,或者利用本地备份数据

压缩格式:按照hadoop api的介绍,压缩格式分两种:BLOCK和RECORD,其中RECORD是只对value进行压缩,一般采用BLOCK进行压缩。

对压缩文件进行计算,需要用SequenceFileInputFormat类来读入压缩文件,以下是计算程序的典型配置代码:

JobConf conf = new JobConf(getConf(), log.class);
conf.setJobName(”log”);
conf.setOutputKeyClass(Text.class);//set the map output key type
conf.setOutputValueClass(Text.class);//set the map output value type

conf.setMapperClass(MapClass.class);
//conf.setCombinerClass(Rece.class);//set the combiner class ,if havenot, use Recuce class for default
conf.setRecerClass(Rece.class);
conf.setInputFormat(SequenceFileInputFormat.class);//necessary if use compress

接下来的处理与非压缩格式的处理一样

‘肆’ 关于用java写程序把本地文件上传到HDFS中的问题

将这FileSystem hdfs = FileSystem.get(config);
改成FileSystem hdfs = FileSystem.get(URI.create("hdfs://master:9000"),config)
上面那句取得的是本地文件系统对象,改成下面这个才是取得hdfs文件系统对象,当你要操作本地文件对象的时候就要用上面那句取得本地文件对象,我在2.7.4刚开始也是跟你一样的错误,改为下面的就可以了

热点内容
sql分组最后一条 发布:2025-02-04 03:38:24 浏览:269
单宫数字奇门算法 发布:2025-02-04 03:33:57 浏览:861
文件夹盒子 发布:2025-02-04 03:33:05 浏览:109
python教案 发布:2025-02-04 03:10:38 浏览:798
怎么编程套料 发布:2025-02-04 02:50:31 浏览:208
副编译 发布:2025-02-04 02:05:25 浏览:613
解压按摩师 发布:2025-02-04 01:21:31 浏览:424
linuxssh限制 发布:2025-02-04 01:20:40 浏览:697
脚本式是什么 发布:2025-02-04 01:06:24 浏览:248
手机wps密码怎么取消密码 发布:2025-02-04 00:51:44 浏览:596