java上傳文件到hdfs
『壹』 剛學習spark,想上傳文件給hdfs,是不是需要hadoop然後java編程這樣是用eclip
spark會把hdfs當做一個數據源來處理, 所以數據存儲都要做, 之後編程是從Hadoop改成spark就可以了. 是否用eclipse無所謂, 只要能編譯運行就可以
『貳』 用java向hdfs上傳文件時,如何實現斷點續傳
@Component("javaLargeFileUploaderServlet") 
@WebServlet(name = "javaLargeFileUploaderServlet", urlPatterns = { "/javaLargeFileUploaderServlet" }) 
public class UploadServlet extends HttpRequestHandlerServlet 
        implements HttpRequestHandler { 
   
    private static final Logger log = LoggerFactory.getLogger(UploadServlet.class); 
   
    @Autowired 
    UploadProcessor uploadProcessor; 
   
    @Autowired 
    FileUploaderHelper fileUploaderHelper; 
   
    @Autowired 
    ExceptionCodeMappingHelper exceptionCodeMappingHelper; 
   
    @Autowired 
    Authorizer authorizer; 
   
    @Autowired 
    StaticStateIdentifierManager staticStateIdentifierManager; 
@Override 
    public void handleRequest(HttpServletRequest request, HttpServletResponse response) 
            throws IOException { 
        log.trace("Handling request"); 
   
        Serializable jsonObject = null; 
        try { 
            // extract the action from the request 
            UploadServletAction actionByParameterName = 
                    UploadServletAction.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.action)); 
   
            // check authorization 
            checkAuthorization(request, actionByParameterName); 
   
            // then process the asked action 
            jsonObject = processAction(actionByParameterName, request); 
// if something has to be written to the response 
            if (jsonObject != null) { 
                fileUploaderHelper.writeToResponse(jsonObject, response); 
            } 
   
        } 
        // If exception, write it 
        catch (Exception e) { 
            exceptionCodeMappingHelper.processException(e, response); 
        } 
   
    } 
private void checkAuthorization(HttpServletRequest request, UploadServletAction actionByParameterName) 
            throws MissingParameterException, AuthorizationException { 
   
        // check authorization 
        // if its not get progress (because we do not really care about authorization for get 
        // progress and it uses an array of file ids) 
        if (!actionByParameterName.equals(UploadServletAction.getProgress)) { 
   
            // extract uuid 
            final String fileIdFieldValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId, false); 
   
            // if this is init, the identifier is the one in parameter 
            UUID clientOrJobId; 
            String parameter = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false); 
            if (actionByParameterName.equals(UploadServletAction.getConfig) && parameter != null) { 
                clientOrJobId = UUID.fromString(parameter); 
            } 
            // if not, get it from manager 
            else { 
                clientOrJobId = staticStateIdentifierManager.getIdentifier(); 
            } 
// call authorizer 
            authorizer.getAuthorization( 
                    request, 
                    actionByParameterName, 
                    clientOrJobId, 
                    fileIdFieldValue != null ? getFileIdsFromString(fileIdFieldValue).toArray(new UUID[] {}) : null); 
   
        } 
    } 
private Serializable processAction(UploadServletAction actionByParameterName, HttpServletRequest request) 
            throws Exception { 
        log.debug("Processing action " + actionByParameterName.name()); 
   
        Serializable returnObject = null; 
        switch (actionByParameterName) { 
            case getConfig: 
                String parameterValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false); 
                returnObject = 
                        uploadProcessor.getConfig( 
                                parameterValue != null ? UUID.fromString(parameterValue) : null); 
                break; 
            case verifyCrcOfUncheckedPart: 
                returnObject = verifyCrcOfUncheckedPart(request); 
                break; 
            case prepareUpload: 
                returnObject = prepareUpload(request); 
                break; 
            case clearFile: 
                uploadProcessor.clearFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId))); 
                break; 
            case clearAll: 
                uploadProcessor.clearAll(); 
                break; 
            case pauseFile: 
                List<UUID> uuids = getFileIdsFromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)); 
                uploadProcessor.pauseFile(uuids); 
                break; 
            case resumeFile: 
                returnObject = 
                        uploadProcessor.resumeFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId))); 
                break; 
            case setRate: 
                uploadProcessor.setUploadRate(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)), 
                        Long.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.rate))); 
                break; 
            case getProgress: 
                returnObject = getProgress(request); 
                break; 
        } 
        return returnObject; 
    } 
List<UUID> getFileIdsFromString(String fileIds) { 
        String[] splittedFileIds = fileIds.split(","); 
        List<UUID> uuids = Lists.newArrayList(); 
        for (int i = 0; i < splittedFileIds.length; i++) { 
            uuids.add(UUID.fromString(splittedFileIds[i])); 
        }  
        return uuids; 
    } 
private Serializable getProgress(HttpServletRequest request) 
            throws MissingParameterException { 
        Serializable returnObject; 
        String[] ids = 
                new Gson() 
                        .fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId), String[].class); 
        Collection<UUID> uuids = Collections2.transform(Arrays.asList(ids), new Function<String, UUID>() { 
   
            @Override 
            public UUID apply(String input) { 
                return UUID.fromString(input); 
            } 
   
        }); 
        returnObject = Maps.newHashMap(); 
        for (UUID fileId : uuids) { 
            try { 
                ProgressJson progress = uploadProcessor.getProgress(fileId); 
                ((HashMap<String, ProgressJson>) returnObject).put(fileId.toString(), progress); 
            } 
            catch (FileNotFoundException e) { 
                log.debug("No progress will be retrieved for " + fileId + " because " + e.getMessage()); 
            } 
        } 
        return returnObject; 
    } 
private Serializable prepareUpload(HttpServletRequest request) 
            throws MissingParameterException, IOException { 
   
        // extract file information 
        PrepareUploadJson[] fromJson = 
                new Gson() 
                        .fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.newFiles), PrepareUploadJson[].class); 
   
        // prepare them 
        final HashMap<String, UUID> prepareUpload = uploadProcessor.prepareUpload(fromJson); 
   
        // return them 
        return Maps.newHashMap(Maps.transformValues(prepareUpload, new Function<UUID, String>() { 
   
            public String apply(UUID input) { 
                return input.toString(); 
            }; 
        })); 
    } 
private Boolean verifyCrcOfUncheckedPart(HttpServletRequest request) 
            throws IOException, MissingParameterException, FileCorruptedException, FileStillProcessingException { 
        UUID fileId = UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)); 
        try { 
            uploadProcessor.verifyCrcOfUncheckedPart(fileId, 
                    fileUploaderHelper.getParameterValue(request, UploadServletParameter.crc)); 
        } 
        catch (InvalidCrcException e) { 
            // no need to log this exception, a fallback behaviour is defined in the 
            // throwing method. 
            // but we need to return something! 
            return Boolean.FALSE; 
        } 
        return Boolean.TRUE; 
    } 
}
『叄』 如何實現讓用戶在網頁中上傳下載文件到HDFS中
hadoop計算需要在hdfs文件系統上進行,文件上傳到hdfs上通常有三種方法:a hadoop自帶的dfs服務,put;b hadoop的API,Writer對象可以實現這一功能;c 調用OTL可執行程序,數據從資料庫直接進入hadoop
  hadoop計算需要在hdfs文件系統上進行,因此每次計算之前必須把需要用到的文件(我們稱為原始文件)都上傳到hdfs上。文件上傳到hdfs上通常有三種方法:
    a hadoop自帶的dfs服務,put;
    b hadoop的API,Writer對象可以實現這一功能;
    c 調用OTL可執行程序,數據從資料庫直接進入hadoop
  由於存在ETL層,因此第三種方案不予考慮
  將a、b方案進行對比,如下:
    1 空間:方案a在hdfs上佔用空間同本地,因此假設只上傳日誌文件,則保存一個月日誌文件將消耗掉約10T空間,如果加上這期間的各種維表、事實表,將佔用大約25T空間
                  方案b經測試,壓縮比大約為3~4:1,因此假設hdfs空間為100T,原來只能保存約4個月的數據,現在可以保存約1年
    2 上傳時間:方案a的上傳時間經測試,200G數據上傳約1小時
                            方案b的上傳時間,程序不做任何優化,大約是以上的4~6倍,但存在一定程度提升速度的餘地
    3 運算時間:經過對200G數據,大約4億條記錄的測試,如果程序以IO操作為主,則壓縮數據的計算可以提高大約50%的速度,但如果程序以內存操作為主,則只能提高5%~10%的速度
    4 其它:未壓縮的數據還有一個好處是可以直接在hdfs上查看原始數據。壓縮數據想看原始數據只能用程序把它導到本地,或者利用本地備份數據
    壓縮格式:按照hadoop api的介紹,壓縮格式分兩種:BLOCK和RECORD,其中RECORD是只對value進行壓縮,一般採用BLOCK進行壓縮。
    對壓縮文件進行計算,需要用SequenceFileInputFormat類來讀入壓縮文件,以下是計算程序的典型配置代碼:
JobConf conf = new JobConf(getConf(), log.class);
    conf.setJobName(」log」);
    conf.setOutputKeyClass(Text.class);//set the map output key type
    conf.setOutputValueClass(Text.class);//set the map output value type
    conf.setMapperClass(MapClass.class);
    //conf.setCombinerClass(Rece.class);//set the combiner class ,if havenot, use Recuce class for default
    conf.setRecerClass(Rece.class);
    conf.setInputFormat(SequenceFileInputFormat.class);//necessary if use compress
    接下來的處理與非壓縮格式的處理一樣
『肆』 關於用java寫程序把本地文件上傳到HDFS中的問題
將這FileSystem hdfs = FileSystem.get(config);
改成FileSystem hdfs = FileSystem.get(URI.create("hdfs://master:9000"),config)
上面那句取得的是本地文件系統對象,改成下面這個才是取得hdfs文件系統對象,當你要操作本地文件對象的時候就要用上面那句取得本地文件對象,我在2.7.4剛開始也是跟你一樣的錯誤,改為下面的就可以了
