日本乱偷中文字幕,美女脱内衣18禁免费看,亚洲国产精品丝袜在线观看,18女人腿打开无遮挡,廖承宇chinese野战做受

【Hadoop】Yarn 作業(yè)啟動(dòng)源碼解讀

作業(yè)啟動(dòng)

作業(yè)提交的客戶(hù)端比較核心的類(lèi)是Job.java,看作業(yè)啟動(dòng)的源碼需要從這個(gè)類(lèi)開(kāi)始看。

Job.java

作業(yè)啟動(dòng)的入口函數為waitForCompletion函數。當前函數的核心函數為submit(),主要如下:

public void submit() 
      throws IOException, InterruptedException, ClassNotFoundException {
 ensureState(JobState.DEFINE);
 setUseNewAPI();
 connect();
 final JobSubmitter submitter = 
     getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
 status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
   public JobStatus run() throws IOException, InterruptedException, 
   ClassNotFoundException {
     return submitter.submitJobInternal(Job.this, cluster);
   }
 });
 state = JobState.RUNNING;
 LOG.info("The url to track the job: " + getTrackingURL());
}

其中,connect主要為連接ResourceManager。核心提交類(lèi)為submitJobInternal,在submitJobInternal中主要包含:

  • 檢查是否開(kāi)啟分布式緩存,核心函數為:addMRFrameworkToDistributedCache(conf);
  • 從yarn上面獲取Yarn ApplicationId。
  • 將需要上傳的文件拷貝到submitJobDir下面,將上傳的結果添加到指定的配置中。主要實(shí)現在函數copyAndConfigureFiles(job, submitJobDir);里面,主要上傳當前作業(yè)需要的jar包等信息到staging目錄。當上傳Jar包比較頻繁的時(shí)候可以考慮開(kāi)啟分布式緩存。
  • 初始化核心配置,主要實(shí)現在函數:writeConf(conf, submitJobFile);里面。
  • 最后才是真正提交作業(yè)的部分:status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());通過(guò)submitClient.submitJob之后是遠程調用到ResourceManager的類(lèi):YARNRunner.java,開(kāi)始作業(yè)提交。

YARNRunner.java

在當前類(lèi)中,處理邏輯主要包含下面幾步:

  • 創(chuàng )建上下問(wèn)信息:ApplicationSubmissionContext,當前這一步當中主要是構造AM相關(guān)參數,比如AM的啟動(dòng)命令等。在A(yíng)M的啟動(dòng)命令中會(huì )設置AM的啟動(dòng)主函數MRAppMaster,在資源調度到當前作業(yè)時(shí),會(huì )先啟動(dòng)AM的主函數MRAppMaster
  • 提交作業(yè)。最后會(huì )調用到rmClient.submitApplication(request);發(fā)送啟動(dòng)作業(yè)的請求,在發(fā)送請求之后會(huì )一直等到作業(yè)啟動(dòng)完成。啟動(dòng)成功之后會(huì )返回appilicationId

資源調度

Yarn資源調度過(guò)程待完善,后面會(huì )單獨章節學(xué)習。

MRAppMaster.java

當前類(lèi)是啟動(dòng)AM的入口函數,所以要從main函數開(kāi)始讀代碼。main函數里面主要做了下面幾件事:

  • 初始化MRAppMaster實(shí)例。
  • 加載job.xml信息。
  • 初始化web信息。主要包含: MR history server、MR Server。
  • 啟動(dòng)APPMaster。

initAndStartAppMaster:?jiǎn)?dòng)AppMaster

MRAppMaster在yarn內部是一個(gè)服務(wù),最終啟動(dòng)的時(shí)候會(huì )調用到serviceStart函數里面,所以我們主要看這個(gè)函數里面做了什么。

1、創(chuàng )建并且初始化Job

創(chuàng )建Job對象并且將其初始化掉。但是不會(huì )啟動(dòng)當前作業(yè)。

  • 初始化JobImpl對象。在JobImpl初始化的時(shí)候做了下面幾件事:

    • 初始化線(xiàn)程池。

    • 初始化作業(yè)狀態(tài)機的核心代碼如下:

      protected static final
        StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> 
           stateMachineFactory
         = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
                  (JobStateInternal.NEW)
              // Transitions from NEW state
              .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
                  JobEventType.JOB_DIAGNOSTIC_UPDATE,
                  DIAGNOSTIC_UPDATE_TRANSITION)
              .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
                  JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
              // ....省略...
              .addTransition(JobStateInternal.REBOOT, JobStateInternal.REBOOT,
                  JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
              // create the topology tables
              .installTopology();
      
      
    • 初始化其他配置。

  • 在中央處理器里面注冊JobFinishEvent類(lèi)型事件以及事件處理的handler。

protected Job createJob(Configuration conf, JobStateInternal forcedState, 
    String diagnostic) {
  // create single job
  Job newJob =
      new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
          taskAttemptListener, jobTokenSecretManager, jobCredentials, clock,
          completedTasksFromPreviousRun, metrics,
          committer, newApiCommitter,
          currentUser.getUserName(), appSubmitTime, amInfos, context, 
          forcedState, diagnostic);
  ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
  dispatcher.register(JobFinishEvent.Type.class,
      createJobFinishEventHandler());   
  return newJob;
}

2、發(fā)送inited事件

發(fā)送inited事件的對象主要是下面兩個(gè):

  • 通過(guò)dispatcher給歷史AM發(fā)送。
  • 當前AM。代碼如下:
// Send out an MR AM inited event for this AM.
dispatcher.getEventHandler().handle(
    new JobHistoryEvent(job.getID(), new AMStartedEvent(amInfo
        .getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(),
        amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo
            .getNodeManagerHttpPort(), this.forcedState == null ? null
                : this.forcedState.toString(), appSubmitTime)));

3、創(chuàng )建job init事件,并且處理

創(chuàng )建init事件,核心代碼如下:

JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
jobEventDispatcher.handle(initJobEvent);

事件處理的核心類(lèi)為InitTransition,核心代碼如下:

public JobStateInternal transition(JobImpl job, JobEvent event) {
  job.metrics.submittedJob(job);
  job.metrics.preparingJob(job);
  // 初始化上下文。
  if (job.newApiCommitter) {
    job.jobContext = new JobContextImpl(job.conf,
        job.oldJobId);
  } else {
    job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
        job.conf, job.oldJobId);
  }
  
  try {
    // 初始化token等信息。
    setup(job);
    job.fs = job.getFileSystem(job.conf);

    //log to job history
    JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
          job.conf.get(MRJobConfig.JOB_NAME, "test"), 
        job.conf.get(MRJobConfig.USER_NAME, "mapred"),
        job.appSubmitTime,
        job.remoteJobConfFile.toString(),
        job.jobACLs, job.queueName,
        job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
        job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
        job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
        getWorkflowAdjacencies(job.conf),
        job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""), job.conf);
    job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
    //TODO JH Verify jobACLs, UserName via UGI?
    // 初始化并行度等信息。
    TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
    job.numMapTasks = taskSplitMetaInfo.length;
    job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);

    if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
      job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
    } else if (job.numMapTasks == 0) {
      job.reduceWeight = 0.9f;
    } else if (job.numReduceTasks == 0) {
      job.mapWeight = 0.9f;
    } else {
      job.mapWeight = job.reduceWeight = 0.45f;
    }

    checkTaskLimits();
  
   // 加載其他參數,具體代碼省略。。

    cleanupSharedCacheUploadPolicies(job.conf);

    // create the Tasks but don't start them yet,, 創(chuàng  )建map task
    createMapTasks(job, inputLength, taskSplitMetaInfo);
    // 創(chuàng  )建reduce tasks
    createReduceTasks(job);

    job.metrics.endPreparingJob(job);
    return JobStateInternal.INITED;
  } catch (Exception e) {
    LOG.warn("Job init failed", e);
    job.metrics.endPreparingJob(job);
    job.addDiagnostic("Job init failed : "
        + StringUtils.stringifyException(e));
    // Leave job in the NEW state. The MR AM will detect that the state is
    // not INITED and send a JOB_INIT_FAILED event.
    return JobStateInternal.NEW;
  }
}

4、檢查初始化結果并且啟動(dòng)作業(yè)

當init成功時(shí),handler返回的結果是JobStateInternal.INITED;如果是失敗了則返回的結果是JobStateInternal.NEW。

對于初始化失敗的作業(yè)會(huì )觸發(fā)JobEventType.JOB_INIT_FAILED事件。

對于初始化成功的作業(yè)會(huì )調用函數startJobs,繼續啟動(dòng)作業(yè)。觸發(fā)

protected void startJobs() {
  /** create a job-start event to get this ball rolling */
  JobEvent startJobEvent = new JobStartEvent(job.getID(),
      recoveredJobStartTime);
  /** send the job-start event. this triggers the job execution. */
  dispatcher.getEventHandler().handle(startJobEvent);
}

核心處理邏輯如下,主要是觸發(fā)了幾個(gè)事件:

  • JobHistoryEvent:事件處理的handler為JobHistoryEventHandler。
  • JobInfoChangeEvent:
  • CommitterJobSetupEvent:作業(yè)啟動(dòng)的事件,核心處理邏輯在EventProcessor中的函數handleJobSetup中。
public void transition(JobImpl job, JobEvent event) {
  JobStartEvent jse = (JobStartEvent) event;
  if (jse.getRecoveredJobStartTime() != -1L) {
    job.startTime = jse.getRecoveredJobStartTime();
  } else {
    job.startTime = job.clock.getTime();
  }
  JobInitedEvent jie =
    new JobInitedEvent(job.oldJobId,
         job.startTime,
         job.numMapTasks, job.numReduceTasks,
         job.getState().toString(),
         job.isUber());
  job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
  JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
      job.appSubmitTime, job.startTime);
  job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
  job.metrics.runningJob(job);

  job.eventHandler.handle(new CommitterJobSetupEvent(
          job.jobId, job.jobContext));
}

handleJobSetup的核心處理邏輯:

  • 創(chuàng )建attempt路徑。
  • 觸發(fā)JobSetupCompletedEvent事件。從事件實(shí)現來(lái)看會(huì )觸發(fā)JobImpl里面的JOB_SETUP_COMPLETED事件類(lèi)型,由SetupCompletedTransition來(lái)處理當前事件。在當前函數里面會(huì )觸發(fā)JOB_COMPLETED事件。最終會(huì )走到JobImpl的checkReadyForCommit函數里面。
protected void handleJobSetup(CommitterJobSetupEvent event) {
  try {
    // 主要是創(chuàng  )建attempt路徑
    committer.setupJob(event.getJobContext());
    context.getEventHandler().handle(
        new JobSetupCompletedEvent(event.getJobID()));
  } catch (Exception e) {
    LOG.warn("Job setup failed", e);
    context.getEventHandler().handle(new JobSetupFailedEvent(
        event.getJobID(), StringUtils.stringifyException(e)));
  }
}

SetupCompletedTransition的處理邏輯如下,可以看到會(huì )定時(shí)啟動(dòng)MapTask和ReduceTask。

public void transition(JobImpl job, JobEvent event) {
  job.setupProgress = 1.0f;
  job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
  job.scheduleTasks(job.reduceTasks, true);

  // If we have no tasks, just transition to job completed
  if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
    job.eventHandler.handle(new JobEvent(job.jobId,
        JobEventType.JOB_COMPLETED));
  }
}

checkReadyForCommit函數的實(shí)現如下,可以看到在觸發(fā)了CommitterJobCommitEvent事件,在CommitterJobCommitEvent里面會(huì )觸發(fā)JOB_COMMIT事件。主要處理邏輯在handleJobCommit里面。

protected JobStateInternal checkReadyForCommit() {
  JobStateInternal currentState = getInternalState();
  if (completedTaskCount == tasks.size()
      && currentState == JobStateInternal.RUNNING) {
    eventHandler.handle(new CommitterJobCommitEvent(jobId, getJobContext()));
    return JobStateInternal.COMMITTING;
  }
  // return the current state as job not ready to commit yet
  return getInternalState();
}

handleJobCommit處理邏輯如下,

protected void handleJobCommit(CommitterJobCommitEvent event) {
  boolean commitJobIsRepeatable = false;
  try {
    // 檢查作業(yè)是否重復。
    commitJobIsRepeatable = committer.isCommitJobRepeatable(
        event.getJobContext());
  } catch (IOException e) {
    LOG.warn("Exception in committer.isCommitJobRepeatable():", e);
  }

  try {
    // 創(chuàng  )建文件:/tmp/hadoop-yarn/staging//user/.staging/{jobid}/COMMIT_STARTED
    touchz(startCommitFile, commitJobIsRepeatable);
    jobCommitStarted();
    // 檢查和RM的心跳。
    waitForValidCommitWindow();
    // 提交作業(yè),核心處理函數在commitJobInternal里面
    committer.commitJob(event.getJobContext());
    // 創(chuàng  )建文件:/tmp/hadoop-yarn/staging//user/.staging/{jobid}/COMMIT_SUCCESS
    touchz(endCommitSuccessFile, commitJobIsRepeatable);
    context.getEventHandler().handle(
        new JobCommitCompletedEvent(event.getJobID()));
  } catch (Exception e) {
    LOG.error("Could not commit job", e);
    try {
      // 失敗之后創(chuàng  )建:/tmp/hadoop-yarn/staging//user/.staging/{jobid}/COMMIT_FAIL
      touchz(endCommitFailureFile, commitJobIsRepeatable);
    } catch (Exception e2) {
      LOG.error("could not create failure file.", e2);
    }
    context.getEventHandler().handle(
        new JobCommitFailedEvent(event.getJobID(),
            StringUtils.stringifyException(e)));
  } finally {
    jobCommitEnded();
  }
}
CommitSucceededTransition

提交成功的事件處理handler為CommitSucceededTransition,核心處理邏輯如下:

job.logJobHistoryFinishedEvent();
job.finished(JobStateInternal.SUCCEEDED);



標 題:《【Hadoop】Yarn 作業(yè)啟動(dòng)源碼解讀
作 者:zeekling
提 示:轉載請注明文章轉載自個(gè)人博客:浪浪山旁那個(gè)村

    評論
    1 評論
    2023-12-08 12:03 回復?

    真的看不懂

avatar

取消
日本乱偷中文字幕,美女脱内衣18禁免费看,亚洲国产精品丝袜在线观看,18女人腿打开无遮挡,廖承宇chinese野战做受