DistCp源碼解析
說(shuō)明
DistCp(分布式拷貝)是用于大規模集群內部和集群之間拷貝的工具。
它使用Map/Reduce實(shí)現文件分發(fā),錯誤處理和恢復,以及報告生成。
它把文件和目錄的列表作為map任務(wù)的輸入,每個(gè)任務(wù)會(huì )完成源列表中部分文件的拷貝。
由于使用了Map/Reduce方法,這個(gè)工具在語(yǔ)義和執行上都會(huì )有特殊的地方。 這篇文檔會(huì )為常用DistCp操作提供指南并闡述它的工作模型。
源碼詳解
作業(yè)啟動(dòng)
作業(yè)的啟動(dòng)主要包含初始化和作業(yè)提交,在初始化階段主要是list左右需要拷貝的文件信息,根據文件信息構造split信息。
作業(yè)提交階段就是根據初始化階段構造的split信息,將作業(yè)提交到Yarn上面。
作業(yè)初始化
初始化階段主要是list左右需要拷貝的文件信息,根據文件信息構造split信息。
DistCp的入口函數是main函數,在main函數里面主要做了兩件事:
- 注冊Cleanup。
- 初始化和啟動(dòng)作業(yè),核心處理函數為execute函數里面的createAndSubmitJob
創(chuàng )建Job對象,主要是指定Map的處理類(lèi),InputFormat 和outputFormat 信息:
Job job = Job.getInstance(getConf());
job.setJobName(jobName);
job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), context));
job.setJarByClass(CopyMapper.class);
configureOutputFormat(job);
job.setMapperClass(CopyMapper.class);
job.setOutputFormatClass(CopyOutputFormat.class);
job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
根據需要拷貝的目錄獲取所有的文件信息。支持snapshot模式和普通模式。
snapshot 模式
核心函數為SimpleCopyListing.doBuildListingWithSnapshotDiff。主要是通過(guò)DistCpSync.getAllDiffs獲取Snapshot的差異文件。
差異文件主要包含創(chuàng )建、修改、刪除類(lèi)型,將差別的的文件輸出到fileList.seq文件里面。fileList.seq文件在staging目錄下面的的_distcp_隨機的int值
。
普通模式
核心函數為SimpleCopyListing.doBuildListing。對于非snapshot模式,核心處理邏輯就是通過(guò)list將所有的文件獲取出來(lái)。添加到fileList.seq里面。
對于XAttrs等權限信息也會(huì )按照-p參數指定的來(lái)獲取。
作業(yè)提交
由于DistCp也是MapReduce作業(yè),所以作業(yè)提交沿用了MapReduce作業(yè)提交的框架,對于Map和Reduce的處理類(lèi),
以及InputFormat和outputFormat都是DistCp自己實(shí)現的。
其中比較常用的是DynamicInputFormat,DynamicInputFormat主要是通過(guò)主要是按照文件數量分配的。
作業(yè)運行
AM運行
在創(chuàng )建作業(yè)的時(shí)候定義了outputFormat,在CopyOutputFormat中定義了getOutputCommitter。
job.setOutputFormatClass(CopyOutputFormat.class);
Distcp的AM結束時(shí)的核心處理類(lèi)是CopyCommitter。結束的時(shí)候會(huì )調用commitJob函數,在commitJob函數里面。
deleteMissing函數
從目標端刪除多余的文件,需要配置-delete參數。
preserveFileAttributesForDirectories函數
Map運行
Reduce運行
