日本乱偷中文字幕,美女脱内衣18禁免费看,亚洲国产精品丝袜在线观看,18女人腿打开无遮挡,廖承宇chinese野战做受

【Hadoop】Yarn 狀態(tài)機以及事件機制

簡(jiǎn)介

Yarn采用了基于事件驅動(dòng)的并發(fā)模型:

  • 所有狀態(tài)機都實(shí)現了EventHandler接口,很多服務(wù)(類(lèi)名通常帶有Service后綴)也實(shí)現了該接口,它們都是事件處理器。
  • 需要異步處理的事件由中央異步調度器(類(lèi)名通常帶有Dispatcher后綴)統一接收/派發(fā),需要同步處理的事件直接交給相應的事件處理器。

pic

某些事件處理器不僅處理事件,也會(huì )向中央異步調度器發(fā)送事件。

事件處理器定義

事件處理器定義如下:

@SuppressWarnings("rawtypes")
@Public
@Evolving
public interface EventHandler<T extends Event> {

  void handle(T event);

}

只有一個(gè)handler函數,如參是事件:

中央處理器AsyncDispatcher

AsyncDispatcher 實(shí)現了接口Dispatcher,Dispatcher中定義了事件Dispatcher的接口。主要提供兩個(gè)功能:

  • 注冊不同類(lèi)型的事件,主要包含事件類(lèi)型和事件處理器。
  • 獲取事件處理器,用來(lái)派發(fā)事件,等待異步執行真正的EventHandler。
@Public
@Evolving
public interface Dispatcher {

  EventHandler<Event> getEventHandler();

  void register(Class<? extends Enum> eventType, EventHandler handler);

}

AsyncDispatcher實(shí)現了Dispatcher接口,也擴展了AbstractService,表明AsyncDispatcher也是一個(gè)服務(wù),
是一個(gè)典型的生產(chǎn)者消費這模型。

public class AsyncDispatcher extends AbstractService implements Dispatcher {
 ...
}

事件處理器的注冊

事件注冊就是將事件寫(xiě)入到eventDispatchers里面,eventDispatchers的定義:Map<Class<? extends Enum>, EventHandler> eventDispatchers,鍵是事件類(lèi)型,value是事件的處理器。

對于同一事件類(lèi)型注冊多次handler處理函數時(shí),將使用MultiListenerHandler代替,MultiListenerHandler里面保存了多個(gè)handler,調用handler函數時(shí),會(huì )依次調用每個(gè)handler。

public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
    /* check to see if we have a listener registered */
    EventHandler<Event> registeredHandler = (EventHandler<Event>) eventDispatchers.get(eventType);
    LOG.info("Registering " + eventType + " for " + handler.getClass());
    if (registeredHandler == null) {
      eventDispatchers.put(eventType, handler);
    } else if (!(registeredHandler instanceof MultiListenerHandler)){
      /* for multiple listeners of an event add the multiple listener handler */
      MultiListenerHandler multiHandler = new MultiListenerHandler();
      multiHandler.addHandler(registeredHandler);
      multiHandler.addHandler(handler);
      eventDispatchers.put(eventType, multiHandler);
    } else {
      /* already a multilistener, just add to it */
      MultiListenerHandler multiHandler
      = (MultiListenerHandler) registeredHandler;
      multiHandler.addHandler(handler);
    }
  }

事件處理

AsyncDispatcher#getEventHandler()是異步派發(fā)的關(guān)鍵:

private final EventHandler<Event> handlerInstance = new GenericEventHandler();

// 省略.....

@Override
public EventHandler<Event> getEventHandler() {
   return handlerInstance;
}

GenericEventHandler:一個(gè)特殊的事件處理器

GenericEventHandler是一個(gè)特殊的事件處理器,用于接受各種事件。由指定線(xiàn)程處理接收到的事件。

public void handle(Event event) {
  if (blockNewEvents) {
    return;
  }
  drained = false;
  /* all this method does is enqueue all the events onto the queue */
  int qSize = eventQueue.size();
  if (qSize != 0 && qSize % 1000 == 0
      && lastEventQueueSizeLogged != qSize) {
    lastEventQueueSizeLogged = qSize;
    LOG.info("Size of event-queue is " + qSize);
  }
  if (qSize != 0 && qSize % detailsInterval == 0
          && lastEventDetailsQueueSizeLogged != qSize) {
    lastEventDetailsQueueSizeLogged = qSize;
    printEventQueueDetails();
    printTrigger = true;
  }
  int remCapacity = eventQueue.remainingCapacity();
  if (remCapacity < 1000) {
    LOG.warn("Very low remaining capacity in the event-queue: "
        + remCapacity);
  }
  try {
    eventQueue.put(event);
  } catch (InterruptedException e) {
    if (!stopped) {
      LOG.warn("AsyncDispatcher thread interrupted", e);
    }
    // Need to reset drained flag to true if event queue is empty,
    // otherwise dispatcher will hang on stop.
    drained = eventQueue.isEmpty();
    throw new YarnRuntimeException(e);
  }
};
  • blockNewEvents: 是否阻塞事件處理,只有當中央處理器停止之后才會(huì )停止接受事件。
  • eventQueue:將接收到的請求放置到當前阻塞隊列里面。方便指定線(xiàn)程及時(shí)處理。

事件處理線(xiàn)程

在服務(wù)啟動(dòng)時(shí)(serviceStart函數)創(chuàng )建一個(gè)線(xiàn)程,會(huì )循環(huán)處理接受到的事件。核心處理邏輯在函數dispatch里面。

Runnable createThread() {
  return new Runnable() {
    @Override
    public void run() {
      while (!stopped && !Thread.currentThread().isInterrupted()) {
        drained = eventQueue.isEmpty();
        // 省略。。。
        Event event;
        try {
          event = eventQueue.take();
        } catch(InterruptedException ie) {
          if (!stopped) {
            LOG.warn("AsyncDispatcher thread interrupted", ie);
          }
          return;
        }
        if (event != null) {
          // 省略。。。
          dispatch(event);
          // 省略。。。
        }
      }
    }
  };
}

dispatch詳解

  • 從已經(jīng)注冊的eventDispatchers列表里面查找當前事件對應的處理器,調用當前處理器的handler函數。
  • 如果當前handler處理出現異常時(shí),默認會(huì )退出RM。
protected void dispatch(Event event) {
  //all events go thru this loop
  LOG.debug("Dispatching the event {}.{}", event.getClass().getName(),
      event);

  Class<? extends Enum> type = event.getType().getDeclaringClass();

  try{
    EventHandler handler = eventDispatchers.get(type);
    if(handler != null) {
      handler.handle(event);
    } else {
      throw new Exception("No handler for registered for " + type);
    }
  } catch (Throwable t) {
    //TODO Maybe log the state of the queue
    LOG.error(FATAL, "Error in dispatcher thread", t);
    // If serviceStop is called, we should exit this thread gracefully.
    if (exitOnDispatchException
        && (ShutdownHookManager.get().isShutdownInProgress()) == false
        && stopped == false) {
      stopped = true;
      Thread shutDownThread = new Thread(createShutDownThread());
      shutDownThread.setName("AsyncDispatcher ShutDown handler");
      shutDownThread.start();
    }
  }
}

狀態(tài)機

狀態(tài)轉換由成員變量StateMachine管理,所有的StateMachine都由StateMachineFactory進(jìn)行管理。由addTransition函數實(shí)現狀態(tài)機。

private static final StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent> stateMachineFactory
                               = new StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent>(RMAppState.NEW)


     // Transitions from NEW state
    .addTransition(RMAppState.NEW, RMAppState.NEW,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())
    .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
            RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
            RMAppState.KILLED, RMAppState.FINAL_SAVING),
        RMAppEventType.RECOVER, new RMAppRecoveredTransition())
    .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
        new AppKilledTransition())
    .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
        RMAppEventType.APP_REJECTED,
        new FinalSavingTransition(new AppRejectedTransition(),
          RMAppState.FAILED))

    .addTransition(
        RMAppState.KILLED,
        RMAppState.KILLED,
        EnumSet.of(RMAppEventType.APP_ACCEPTED,
            RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
            RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
            RMAppEventType.NODE_UPDATE, RMAppEventType.START))

     .installTopology();

Transition定義了“從一個(gè)狀態(tài)轉換到另一個(gè)狀態(tài)”的行為,由轉換操作、開(kāi)始狀態(tài)、事件類(lèi)型、事件組成:

public interface StateMachine
                 <STATE extends Enum<STATE>,
                  EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
  public STATE getCurrentState();
  public STATE getPreviousState();
  public STATE doTransition(EVENTTYPE eventType, EVENT event)
        throws InvalidStateTransitionException;
}

ResourceManager中狀態(tài)機

  • RMApp:用于維護一個(gè)Application的生命周期,實(shí)現類(lèi) - RMAppImpl
  • RMAppAttempt:用于維護一次試探運行的生命周期,實(shí)現類(lèi) - RMAppAttemptImpl
  • RMContainer:用于維護一個(gè)已分配的資源最小單位Container的生命周期,實(shí)現類(lèi) - RMContainerImpl
  • RMNode:用于維護一個(gè)NodeManager的生命周期,實(shí)現類(lèi) - RMNodeImpl

NodeManager中狀態(tài)機:

  • Application:用于維護節點(diǎn)上一個(gè)Application的生命周期,實(shí)現類(lèi) - ApplicationImpl
  • Container:用于維護節點(diǎn)上一個(gè)容器的生命周期,實(shí)現類(lèi) - ContainerImpl
  • LocalizedResource:用于維護節點(diǎn)上資源本地化的生命周期,沒(méi)有使用接口即實(shí)現類(lèi) - LocalizedResource



標 題:《【Hadoop】Yarn 狀態(tài)機以及事件機制
作 者:zeekling
提 示:轉載請注明文章轉載自個(gè)人博客:浪浪山旁那個(gè)村

    評論
    0 評論
avatar

取消
日本乱偷中文字幕,美女脱内衣18禁免费看,亚洲国产精品丝袜在线观看,18女人腿打开无遮挡,廖承宇chinese野战做受