ContainerManager詳解
簡(jiǎn)介
ContainerManager主要負責NM中管理所有Container生命周期,其主要包含啟動(dòng)Container、恢復Container、停止Container等功能。
主要功能由ContainerManagerImpl類(lèi)實(shí)現,具體代碼可以參考當前類(lèi)。
初始化
初始化主要分為兩部分:
ContainerManagerImpl實(shí)例的構造函數和serviceInit函數。
構造函數
當前函數為構造函數,主要初始化必須要的一些變量等。
- dispatcher : 事件的中央調度器,主要用于管理單個(gè)Container的生命周期。在當前函數里面會(huì )通過(guò)
dispatcher.register()
注冊支持的事件。 - containersLauncher: 主要用于啟動(dòng)Container,可以通過(guò)配置項yarn.nodemanager.containers-launcher.class指定。默認為containersLauncher.class
serviceInit函數
主要是服務(wù)啟動(dòng)時(shí)的初始化函數,ContainerManager在NodeManager內部屬于一個(gè)服務(wù)。所以初始化的時(shí)候會(huì )調用這個(gè)函數初始化一些服務(wù)相關(guān)的東西。
在這個(gè)函數里面總結下來(lái)主要做了幾件事:
- 繼續初始化一些事件,主要包含LogHandlerEventType、SharedCacheUploadEventType。
- 初始化AMRMProxyService。
- 從本地的LevelDB恢復Container信息。
恢復當前NodeManager的所有作業(yè)信息
第一步:恢復Application 信息
首先是從LevelDB里面加載Application信息。循環(huán)加載。
RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
try (RecoveryIterator<ContainerManagerApplicationProto> rasIterator =
appsState.getIterator()) {
while (rasIterator.hasNext()) {
ContainerManagerApplicationProto proto = rasIterator.next();
LOG.debug("Recovering application with state: {}", proto);
recoverApplication(proto);
}
}
加載Application的時(shí)候會(huì )將Application的上下文信息從LevelDB里面讀出來(lái),通過(guò)上下文信息等初始化新的ApplicationImpl,并且觸發(fā)ApplicationInitEvent事件。
會(huì )根據當前作業(yè)上下文中實(shí)際的狀態(tài)等信息跳轉到實(shí)際的狀態(tài)。
ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), fc,
appId, creds, context, p.getAppLogAggregationInitedTime());
context.getApplications().put(appId, app);
metrics.runningApplication();
app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
第二步:恢復所有的Container信息
第二步是從LevelDB里面加載Container信息。循環(huán)加載。
try (RecoveryIterator<RecoveredContainerState> rcsIterator =
stateStore.getContainerStateIterator()) {
while (rcsIterator.hasNext()) {
RecoveredContainerState rcs = rcsIterator.next();
LOG.debug("Recovering container with state: {}", rcs);
recoverContainer(rcs);
}
}
recoverContainer函數用于恢復單個(gè)Container信息。對于已經(jīng)存在的Application對應的Container會(huì )通過(guò)LevelDB里面加載到的信息初始化Container對象,
將其加到所有Container的列表里面并且觸發(fā)ApplicationContainerInitEvent,后續會(huì )根據實(shí)際狀態(tài)信息跳轉到指定狀態(tài)繼續處理。
Container container = new ContainerImpl(getConfig(), dispatcher,
launchContext, credentials, metrics, token, context, rcs);
context.getContainers().put(token.getContainerID(), container);
containerScheduler.recoverActiveContainer(container, rcs);
app.handle(new ApplicationContainerInitEvent(container));
如果發(fā)現作業(yè)的狀態(tài)為KILL狀態(tài),則會(huì )為當前Container重新觸發(fā)KILL事件,保證Container已經(jīng)停止。
對于A(yíng)pplication找不見(jiàn)的Container,認為作業(yè)已經(jīng)結束了,直接標記為已經(jīng)完成。
在恢復完成之后會(huì )觸發(fā)事件: ContainerSchedulerEventType.RECOVERY_COMPLETED
此狀態(tài)會(huì )重新拉起所有的Container。
啟動(dòng)Containers
獲取NMToken
在Container啟動(dòng)之前需要獲取NMToken,可以通過(guò)下面命令獲取,一般情況下獲取第一個(gè)NMTokenIdentifier類(lèi)型的Token。
Set<TokenIdentifier> tokenIdentifiers = remoteUgi.getTokenIdentifiers();
開(kāi)始啟動(dòng)Container
啟動(dòng)之前需要做的就是初始化ContainerImpl信息,方便后續啟動(dòng)Container。
Container container =
new ContainerImpl(getConfig(), this.dispatcher,
launchContext, credentials, metrics, containerTokenIdentifier,
context, containerStartTime);
如果是第一次啟動(dòng)(滿(mǎn)足條件:!context.getApplications().containsKey(applicationID))
,也就是AM,會(huì )通過(guò)下面命令觸發(fā)作業(yè)的啟動(dòng):
context.getNMStateStore().storeApplication(applicationID,
buildAppProto(applicationID, user, credentials, appAcls,
logAggregationContext, flowContext));
dispatcher.getEventHandler().handle(new ApplicationInitEvent(
applicationID, appAcls, logAggregationContext));
滿(mǎn)足下面條件則是恢復Application:
containerTokenIdentifier.getContainerType() == ContainerType.APPLICATION_MASTER && context.getApplications().containsKey(applicationID))
啟動(dòng)Container主要是觸發(fā)Container的Init事件:
this.context.getNMStateStore().storeContainer(containerId,
containerTokenIdentifier.getVersion(), containerStartTime, request);
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
停止Container
獲取NMToken
在Container停止之前需要獲取NMToken,可以通過(guò)下面命令獲取,一般情況下獲取第一個(gè)NMTokenIdentifier類(lèi)型的Token。和啟動(dòng)時(shí)候的類(lèi)似。
Set<TokenIdentifier> tokenIdentifiers = remoteUgi.getTokenIdentifiers();
停止作業(yè)
停止作業(yè)的時(shí)候會(huì )優(yōu)先通過(guò)context.getContainers().get(containerID)
獲取Container信息。
如下場(chǎng)景會(huì )拋出異常:
- 對于查詢(xún)不到的Container,如果不是最新停止的,則會(huì )拋出異常。
- 對于正在恢復的Container,不會(huì )接受停止,會(huì )拋出異常。
停止作業(yè)的核心邏輯如下,核心思想是觸發(fā)KILL事件。具體可以參見(jiàn)Container事件處理
context.getNMStateStore().storeContainerKilled(containerID);
container.sendKillEvent(ContainerExitStatus.KILLED_BY_APPMASTER,
"Container killed by the ApplicationMaster.");
Container事件處理
Container啟動(dòng)的開(kāi)始是從事件InitContainerTransition。由當前事件通過(guò)狀態(tài)機轉到其他狀態(tài)。
不錯