【Hadoop】Yarn 狀態(tài)機以及事件機制
簡(jiǎn)介
Yarn采用了基于事件驅動(dòng)的并發(fā)模型:
- 所有狀態(tài)機都實(shí)現了EventHandler接口,很多服務(wù)(類(lèi)名通常帶有Service后綴)也實(shí)現了該接口,它們都是事件處理器。
- 需要異步處理的事件由中央異步調度器(類(lèi)名通常帶有Dispatcher后綴)統一接收/派發(fā),需要同步處理的事件直接交給相應的事件處理器。
某些事件處理器不僅處理事件,也會(huì )向中央異步調度器發(fā)送事件。
事件處理器定義
事件處理器定義如下:
@SuppressWarnings("rawtypes")
@Public
@Evolving
public interface EventHandler<T extends Event> {
void handle(T event);
}
只有一個(gè)handler函數,如參是事件:
中央處理器AsyncDispatcher
AsyncDispatcher 實(shí)現了接口Dispatcher,Dispatcher中定義了事件Dispatcher的接口。主要提供兩個(gè)功能:
- 注冊不同類(lèi)型的事件,主要包含事件類(lèi)型和事件處理器。
- 獲取事件處理器,用來(lái)派發(fā)事件,等待異步執行真正的EventHandler。
@Public
@Evolving
public interface Dispatcher {
EventHandler<Event> getEventHandler();
void register(Class<? extends Enum> eventType, EventHandler handler);
}
AsyncDispatcher實(shí)現了Dispatcher接口,也擴展了AbstractService,表明AsyncDispatcher也是一個(gè)服務(wù),
是一個(gè)典型的生產(chǎn)者消費這模型。
public class AsyncDispatcher extends AbstractService implements Dispatcher {
...
}
事件處理器的注冊
事件注冊就是將事件寫(xiě)入到eventDispatchers里面,eventDispatchers的定義:Map<Class<? extends Enum>, EventHandler> eventDispatchers
,鍵是事件類(lèi)型,value是事件的處理器。
對于同一事件類(lèi)型注冊多次handler處理函數時(shí),將使用MultiListenerHandler代替,MultiListenerHandler里面保存了多個(gè)handler,調用handler函數時(shí),會(huì )依次調用每個(gè)handler。
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
/* check to see if we have a listener registered */
EventHandler<Event> registeredHandler = (EventHandler<Event>) eventDispatchers.get(eventType);
LOG.info("Registering " + eventType + " for " + handler.getClass());
if (registeredHandler == null) {
eventDispatchers.put(eventType, handler);
} else if (!(registeredHandler instanceof MultiListenerHandler)){
/* for multiple listeners of an event add the multiple listener handler */
MultiListenerHandler multiHandler = new MultiListenerHandler();
multiHandler.addHandler(registeredHandler);
multiHandler.addHandler(handler);
eventDispatchers.put(eventType, multiHandler);
} else {
/* already a multilistener, just add to it */
MultiListenerHandler multiHandler
= (MultiListenerHandler) registeredHandler;
multiHandler.addHandler(handler);
}
}
事件處理
AsyncDispatcher#getEventHandler()是異步派發(fā)的關(guān)鍵:
private final EventHandler<Event> handlerInstance = new GenericEventHandler();
// 省略.....
@Override
public EventHandler<Event> getEventHandler() {
return handlerInstance;
}
GenericEventHandler:一個(gè)特殊的事件處理器
GenericEventHandler是一個(gè)特殊的事件處理器,用于接受各種事件。由指定線(xiàn)程處理接收到的事件。
public void handle(Event event) {
if (blockNewEvents) {
return;
}
drained = false;
/* all this method does is enqueue all the events onto the queue */
int qSize = eventQueue.size();
if (qSize != 0 && qSize % 1000 == 0
&& lastEventQueueSizeLogged != qSize) {
lastEventQueueSizeLogged = qSize;
LOG.info("Size of event-queue is " + qSize);
}
if (qSize != 0 && qSize % detailsInterval == 0
&& lastEventDetailsQueueSizeLogged != qSize) {
lastEventDetailsQueueSizeLogged = qSize;
printEventQueueDetails();
printTrigger = true;
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue: "
+ remCapacity);
}
try {
eventQueue.put(event);
} catch (InterruptedException e) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", e);
}
// Need to reset drained flag to true if event queue is empty,
// otherwise dispatcher will hang on stop.
drained = eventQueue.isEmpty();
throw new YarnRuntimeException(e);
}
};
- blockNewEvents: 是否阻塞事件處理,只有當中央處理器停止之后才會(huì )停止接受事件。
- eventQueue:將接收到的請求放置到當前阻塞隊列里面。方便指定線(xiàn)程及時(shí)處理。
事件處理線(xiàn)程
在服務(wù)啟動(dòng)時(shí)(serviceStart函數)創(chuàng )建一個(gè)線(xiàn)程,會(huì )循環(huán)處理接受到的事件。核心處理邏輯在函數dispatch里面。
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
drained = eventQueue.isEmpty();
// 省略。。。
Event event;
try {
event = eventQueue.take();
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
// 省略。。。
dispatch(event);
// 省略。。。
}
}
}
};
}
dispatch詳解
- 從已經(jīng)注冊的eventDispatchers列表里面查找當前事件對應的處理器,調用當前處理器的handler函數。
- 如果當前handler處理出現異常時(shí),默認會(huì )退出RM。
protected void dispatch(Event event) {
//all events go thru this loop
LOG.debug("Dispatching the event {}.{}", event.getClass().getName(),
event);
Class<? extends Enum> type = event.getType().getDeclaringClass();
try{
EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
handler.handle(event);
} else {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.error(FATAL, "Error in dispatcher thread", t);
// If serviceStop is called, we should exit this thread gracefully.
if (exitOnDispatchException
&& (ShutdownHookManager.get().isShutdownInProgress()) == false
&& stopped == false) {
stopped = true;
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
}
}
}
狀態(tài)機
狀態(tài)轉換由成員變量StateMachine管理,所有的StateMachine都由StateMachineFactory進(jìn)行管理。由addTransition函數實(shí)現狀態(tài)機。
private static final StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent> stateMachineFactory
= new StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent>(RMAppState.NEW)
// Transitions from NEW state
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
RMAppState.KILLED, RMAppState.FINAL_SAVING),
RMAppEventType.RECOVER, new RMAppRecoveredTransition())
.addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
new AppKilledTransition())
.addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(new AppRejectedTransition(),
RMAppState.FAILED))
.addTransition(
RMAppState.KILLED,
RMAppState.KILLED,
EnumSet.of(RMAppEventType.APP_ACCEPTED,
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
RMAppEventType.NODE_UPDATE, RMAppEventType.START))
.installTopology();
Transition定義了“從一個(gè)狀態(tài)轉換到另一個(gè)狀態(tài)”的行為,由轉換操作、開(kāi)始狀態(tài)、事件類(lèi)型、事件組成:
public interface StateMachine
<STATE extends Enum<STATE>,
EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
public STATE getCurrentState();
public STATE getPreviousState();
public STATE doTransition(EVENTTYPE eventType, EVENT event)
throws InvalidStateTransitionException;
}
ResourceManager中狀態(tài)機
- RMApp:用于維護一個(gè)Application的生命周期,實(shí)現類(lèi) - RMAppImpl
- RMAppAttempt:用于維護一次試探運行的生命周期,實(shí)現類(lèi) - RMAppAttemptImpl
- RMContainer:用于維護一個(gè)已分配的資源最小單位Container的生命周期,實(shí)現類(lèi) - RMContainerImpl
- RMNode:用于維護一個(gè)NodeManager的生命周期,實(shí)現類(lèi) - RMNodeImpl
NodeManager中狀態(tài)機:
- Application:用于維護節點(diǎn)上一個(gè)Application的生命周期,實(shí)現類(lèi) - ApplicationImpl
- Container:用于維護節點(diǎn)上一個(gè)容器的生命周期,實(shí)現類(lèi) - ContainerImpl
- LocalizedResource:用于維護節點(diǎn)上資源本地化的生命周期,沒(méi)有使用接口即實(shí)現類(lèi) - LocalizedResource
