【Hadoop】Yarn 作業(yè)啟動(dòng)源碼解讀
作業(yè)啟動(dòng)
作業(yè)提交的客戶(hù)端比較核心的類(lèi)是Job.java,看作業(yè)啟動(dòng)的源碼需要從這個(gè)類(lèi)開(kāi)始看。
Job.java
作業(yè)啟動(dòng)的入口函數為waitForCompletion函數。當前函數的核心函數為submit(),主要如下:
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
其中,connect主要為連接ResourceManager。核心提交類(lèi)為submitJobInternal,在submitJobInternal中主要包含:
- 檢查是否開(kāi)啟分布式緩存,核心函數為:
addMRFrameworkToDistributedCache(conf);
- 從yarn上面獲取Yarn ApplicationId。
- 將需要上傳的文件拷貝到submitJobDir下面,將上傳的結果添加到指定的配置中。主要實(shí)現在函數
copyAndConfigureFiles(job, submitJobDir);
里面,主要上傳當前作業(yè)需要的jar包等信息到staging目錄。當上傳Jar包比較頻繁的時(shí)候可以考慮開(kāi)啟分布式緩存。 - 初始化核心配置,主要實(shí)現在函數:
writeConf(conf, submitJobFile);
里面。 - 最后才是真正提交作業(yè)的部分:
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
通過(guò)submitClient.submitJob之后是遠程調用到ResourceManager的類(lèi):YARNRunner.java,開(kāi)始作業(yè)提交。
YARNRunner.java
在當前類(lèi)中,處理邏輯主要包含下面幾步:
- 創(chuàng )建上下問(wèn)信息:ApplicationSubmissionContext,當前這一步當中主要是構造AM相關(guān)參數,比如AM的啟動(dòng)命令等。在A(yíng)M的啟動(dòng)命令中會(huì )設置AM的啟動(dòng)主函數MRAppMaster,在資源調度到當前作業(yè)時(shí),會(huì )先啟動(dòng)AM的主函數MRAppMaster
- 提交作業(yè)。最后會(huì )調用到
rmClient.submitApplication(request);
發(fā)送啟動(dòng)作業(yè)的請求,在發(fā)送請求之后會(huì )一直等到作業(yè)啟動(dòng)完成。啟動(dòng)成功之后會(huì )返回appilicationId
資源調度
Yarn資源調度過(guò)程待完善,后面會(huì )單獨章節學(xué)習。
MRAppMaster.java
當前類(lèi)是啟動(dòng)AM的入口函數,所以要從main函數開(kāi)始讀代碼。main函數里面主要做了下面幾件事:
- 初始化MRAppMaster實(shí)例。
- 加載job.xml信息。
- 初始化web信息。主要包含: MR history server、MR Server。
- 啟動(dòng)APPMaster。
initAndStartAppMaster:?jiǎn)?dòng)AppMaster
MRAppMaster在yarn內部是一個(gè)服務(wù),最終啟動(dòng)的時(shí)候會(huì )調用到serviceStart函數里面,所以我們主要看這個(gè)函數里面做了什么。
1、創(chuàng )建并且初始化Job
創(chuàng )建Job對象并且將其初始化掉。但是不會(huì )啟動(dòng)當前作業(yè)。
-
初始化JobImpl對象。在JobImpl初始化的時(shí)候做了下面幾件事:
-
初始化線(xiàn)程池。
-
初始化作業(yè)狀態(tài)機的核心代碼如下:
protected static final StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> stateMachineFactory = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> (JobStateInternal.NEW) // Transitions from NEW state .addTransition(JobStateInternal.NEW, JobStateInternal.NEW, JobEventType.JOB_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) .addTransition(JobStateInternal.NEW, JobStateInternal.NEW, JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) // ....省略... .addTransition(JobStateInternal.REBOOT, JobStateInternal.REBOOT, JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) // create the topology tables .installTopology();
-
初始化其他配置。
-
-
在中央處理器里面注冊JobFinishEvent類(lèi)型事件以及事件處理的handler。
protected Job createJob(Configuration conf, JobStateInternal forcedState,
String diagnostic) {
// create single job
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, jobCredentials, clock,
completedTasksFromPreviousRun, metrics,
committer, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos, context,
forcedState, diagnostic);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
createJobFinishEventHandler());
return newJob;
}
2、發(fā)送inited事件
發(fā)送inited事件的對象主要是下面兩個(gè):
- 通過(guò)dispatcher給歷史AM發(fā)送。
- 當前AM。代碼如下:
// Send out an MR AM inited event for this AM.
dispatcher.getEventHandler().handle(
new JobHistoryEvent(job.getID(), new AMStartedEvent(amInfo
.getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(),
amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo
.getNodeManagerHttpPort(), this.forcedState == null ? null
: this.forcedState.toString(), appSubmitTime)));
3、創(chuàng )建job init事件,并且處理
創(chuàng )建init事件,核心代碼如下:
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
jobEventDispatcher.handle(initJobEvent);
事件處理的核心類(lèi)為InitTransition,核心代碼如下:
public JobStateInternal transition(JobImpl job, JobEvent event) {
job.metrics.submittedJob(job);
job.metrics.preparingJob(job);
// 初始化上下文。
if (job.newApiCommitter) {
job.jobContext = new JobContextImpl(job.conf,
job.oldJobId);
} else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
job.conf, job.oldJobId);
}
try {
// 初始化token等信息。
setup(job);
job.fs = job.getFileSystem(job.conf);
//log to job history
JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
job.conf.get(MRJobConfig.JOB_NAME, "test"),
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
job.appSubmitTime,
job.remoteJobConfFile.toString(),
job.jobACLs, job.queueName,
job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
getWorkflowAdjacencies(job.conf),
job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""), job.conf);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
//TODO JH Verify jobACLs, UserName via UGI?
// 初始化并行度等信息。
TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
job.numMapTasks = taskSplitMetaInfo.length;
job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
} else if (job.numMapTasks == 0) {
job.reduceWeight = 0.9f;
} else if (job.numReduceTasks == 0) {
job.mapWeight = 0.9f;
} else {
job.mapWeight = job.reduceWeight = 0.45f;
}
checkTaskLimits();
// 加載其他參數,具體代碼省略。。
cleanupSharedCacheUploadPolicies(job.conf);
// create the Tasks but don't start them yet,, 創(chuàng )建map task
createMapTasks(job, inputLength, taskSplitMetaInfo);
// 創(chuàng )建reduce tasks
createReduceTasks(job);
job.metrics.endPreparingJob(job);
return JobStateInternal.INITED;
} catch (Exception e) {
LOG.warn("Job init failed", e);
job.metrics.endPreparingJob(job);
job.addDiagnostic("Job init failed : "
+ StringUtils.stringifyException(e));
// Leave job in the NEW state. The MR AM will detect that the state is
// not INITED and send a JOB_INIT_FAILED event.
return JobStateInternal.NEW;
}
}
4、檢查初始化結果并且啟動(dòng)作業(yè)
當init成功時(shí),handler返回的結果是JobStateInternal.INITED;如果是失敗了則返回的結果是JobStateInternal.NEW。
對于初始化失敗的作業(yè)會(huì )觸發(fā)JobEventType.JOB_INIT_FAILED事件。
對于初始化成功的作業(yè)會(huì )調用函數startJobs,繼續啟動(dòng)作業(yè)。觸發(fā)
protected void startJobs() {
/** create a job-start event to get this ball rolling */
JobEvent startJobEvent = new JobStartEvent(job.getID(),
recoveredJobStartTime);
/** send the job-start event. this triggers the job execution. */
dispatcher.getEventHandler().handle(startJobEvent);
}
核心處理邏輯如下,主要是觸發(fā)了幾個(gè)事件:
- JobHistoryEvent:事件處理的handler為JobHistoryEventHandler。
- JobInfoChangeEvent:
- CommitterJobSetupEvent:作業(yè)啟動(dòng)的事件,核心處理邏輯在EventProcessor中的函數handleJobSetup中。
public void transition(JobImpl job, JobEvent event) {
JobStartEvent jse = (JobStartEvent) event;
if (jse.getRecoveredJobStartTime() != -1L) {
job.startTime = jse.getRecoveredJobStartTime();
} else {
job.startTime = job.clock.getTime();
}
JobInitedEvent jie =
new JobInitedEvent(job.oldJobId,
job.startTime,
job.numMapTasks, job.numReduceTasks,
job.getState().toString(),
job.isUber());
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
job.appSubmitTime, job.startTime);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
job.metrics.runningJob(job);
job.eventHandler.handle(new CommitterJobSetupEvent(
job.jobId, job.jobContext));
}
handleJobSetup的核心處理邏輯:
- 創(chuàng )建attempt路徑。
- 觸發(fā)JobSetupCompletedEvent事件。從事件實(shí)現來(lái)看會(huì )觸發(fā)JobImpl里面的JOB_SETUP_COMPLETED事件類(lèi)型,由SetupCompletedTransition來(lái)處理當前事件。在當前函數里面會(huì )觸發(fā)JOB_COMPLETED事件。最終會(huì )走到JobImpl的checkReadyForCommit函數里面。
protected void handleJobSetup(CommitterJobSetupEvent event) {
try {
// 主要是創(chuàng )建attempt路徑
committer.setupJob(event.getJobContext());
context.getEventHandler().handle(
new JobSetupCompletedEvent(event.getJobID()));
} catch (Exception e) {
LOG.warn("Job setup failed", e);
context.getEventHandler().handle(new JobSetupFailedEvent(
event.getJobID(), StringUtils.stringifyException(e)));
}
}
SetupCompletedTransition的處理邏輯如下,可以看到會(huì )定時(shí)啟動(dòng)MapTask和ReduceTask。
public void transition(JobImpl job, JobEvent event) {
job.setupProgress = 1.0f;
job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
job.scheduleTasks(job.reduceTasks, true);
// If we have no tasks, just transition to job completed
if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
job.eventHandler.handle(new JobEvent(job.jobId,
JobEventType.JOB_COMPLETED));
}
}
checkReadyForCommit函數的實(shí)現如下,可以看到在觸發(fā)了CommitterJobCommitEvent事件,在CommitterJobCommitEvent里面會(huì )觸發(fā)JOB_COMMIT事件。主要處理邏輯在handleJobCommit里面。
protected JobStateInternal checkReadyForCommit() {
JobStateInternal currentState = getInternalState();
if (completedTaskCount == tasks.size()
&& currentState == JobStateInternal.RUNNING) {
eventHandler.handle(new CommitterJobCommitEvent(jobId, getJobContext()));
return JobStateInternal.COMMITTING;
}
// return the current state as job not ready to commit yet
return getInternalState();
}
handleJobCommit處理邏輯如下,
protected void handleJobCommit(CommitterJobCommitEvent event) {
boolean commitJobIsRepeatable = false;
try {
// 檢查作業(yè)是否重復。
commitJobIsRepeatable = committer.isCommitJobRepeatable(
event.getJobContext());
} catch (IOException e) {
LOG.warn("Exception in committer.isCommitJobRepeatable():", e);
}
try {
// 創(chuàng )建文件:/tmp/hadoop-yarn/staging//user/.staging/{jobid}/COMMIT_STARTED
touchz(startCommitFile, commitJobIsRepeatable);
jobCommitStarted();
// 檢查和RM的心跳。
waitForValidCommitWindow();
// 提交作業(yè),核心處理函數在commitJobInternal里面
committer.commitJob(event.getJobContext());
// 創(chuàng )建文件:/tmp/hadoop-yarn/staging//user/.staging/{jobid}/COMMIT_SUCCESS
touchz(endCommitSuccessFile, commitJobIsRepeatable);
context.getEventHandler().handle(
new JobCommitCompletedEvent(event.getJobID()));
} catch (Exception e) {
LOG.error("Could not commit job", e);
try {
// 失敗之后創(chuàng )建:/tmp/hadoop-yarn/staging//user/.staging/{jobid}/COMMIT_FAIL
touchz(endCommitFailureFile, commitJobIsRepeatable);
} catch (Exception e2) {
LOG.error("could not create failure file.", e2);
}
context.getEventHandler().handle(
new JobCommitFailedEvent(event.getJobID(),
StringUtils.stringifyException(e)));
} finally {
jobCommitEnded();
}
}
CommitSucceededTransition
提交成功的事件處理handler為CommitSucceededTransition,核心處理邏輯如下:
job.logJobHistoryFinishedEvent();
job.finished(JobStateInternal.SUCCEEDED);
真的看不懂