BPServiceActor詳解
簡(jiǎn)介
BPServiceActor 主要在DataNode中用于和NameNode溝通的類(lèi)。主要功能如下:
- 與 namenode 進(jìn)行預注冊握手。
- 向 namenode 注冊。
- 定期向 namenode 發(fā)送心跳。
- 處理從 namenode 收到的命令。
核心功能
BPServiceActor的入口函數為start函數,當前類(lèi)本身為runnable接口的實(shí)現類(lèi),所以在start函數里面新建了BPServiceActor線(xiàn)程,并且將其啟動(dòng),
所以其真實(shí)的啟動(dòng)函數為run()
在run函數里面主要做了連接NameNode并且注冊當前DataNode的事。
與NameNode握手
首先要做的就是和NameNode建立連接,核心代碼如下:
bpNamenode = dn.connectToNN(nnAddr);
建立連接之后需要做的就是獲取獲取版本信息并且檢查版本信息,如果單次獲取失敗會(huì )進(jìn)行重試。失敗之后每次的重試間隔為5s。
NamespaceInfo nsInfo = retrieveNamespaceInfo();
bpos.verifyAndSetNamespaceInfo(this, nsInfo);
到此與NameNode之間的握手結束。開(kāi)始注冊當前的DataNode到NameNode。
注冊DataNode
DataNode注冊核心邏輯
注冊DataNode的核心函數是register
void register(NamespaceInfo nsInfo) throws IOException {
// The handshake() phase loaded the block pool storage
// off disk - so update the bpRegistration object from that info
DatanodeRegistration newBpRegistration = bpos.createRegistration();
LOG.info(this + " beginning handshake with NN");
while (shouldRun()) {
try {
// 向NN注冊DataNodde
newBpRegistration = bpNamenode.registerDatanode(newBpRegistration);
newBpRegistration.setNamespaceInfo(nsInfo);
bpRegistration = newBpRegistration;
break;
// ... 省略部分異常處理
}
}
if (bpRegistration == null) {
throw new IOException("DN shut down before block pool registered");
}
// .. 省略 ..
// reset lease id whenever registered to NN.
// ask for a new lease id at the next heartbeat.
fullBlockReportLeaseId = 0;
// random short delay - helps scatter the BR from all DatanodeRegistration
// 定時(shí)匯報block塊信息給NN
scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs, true);
}
NameNode 處理注冊信息
NameNode處理DataNode的注冊信息,主要接口是在NameNodeRpcServer的ipc接口registerDatanode。
其主要的實(shí)現實(shí)在DatanodeManager中的函數registerDatanode。
主要實(shí)現如下:
// update cluster map
getNetworkTopology().remove(nodeS);
if(shouldCountVersion(nodeS)) {
decrementVersionCount(nodeS.getSoftwareVersion());
}
nodeS.updateRegInfo(nodeReg);
nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion());
nodeS.setDisallowed(false); // Node is in the include list
// resolve network location
if(this.rejectUnresolvedTopologyDN) {
nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
nodeS.setDependentHostNames(getNetworkDependencies(nodeS));
} else {
nodeS.setNetworkLocation(
resolveNetworkLocationWithFallBackToDefaultLocation(nodeS));
nodeS.setDependentHostNames(
getNetworkDependenciesWithDefault(nodeS));
}
getNetworkTopology().add(nodeS);
resolveUpgradeDomain(nodeS);
// also treat the registration message as a heartbeat
heartbeatManager.register(nodeS);
incrementVersionCount(nodeS.getSoftwareVersion());
startAdminOperationIfNecessary(nodeS);
success = true;
主要做了下面兩件事:
- 更新節點(diǎn)信息,比如網(wǎng)絡(luò )拓撲信息。
- 在心跳管理里面注冊當前節點(diǎn)。只要將節點(diǎn)信息添加到節點(diǎn)列表里面即可。
發(fā)送心跳
在DN和NN建立連接并且注冊完成之后,會(huì )定時(shí)向NN發(fā)送心跳信息。入口函數為:offerService。在這個(gè)函數里面控制定時(shí)向NN發(fā)送心跳。
通過(guò)下面類(lèi)似的方式計算是否需要發(fā)送心跳:
final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
心跳信息包含下面信息:
- DataNode名稱(chēng)。
- DataNode文件傳輸的端口。
- DataNode的所有容量。
- DataNode的可用容量。
- 慢盤(pán)等信息上報。
心跳的響應類(lèi)是HeartbeatResponse。主要包含下面信息:
- NN的Ha狀態(tài)信息。如果NN的主備信息發(fā)生變化,需要跟新DN中的NN信息。
- NN發(fā)送給DN的所有命令。在offerService里面會(huì )將這些命令異步執行。
- 滾動(dòng)重啟狀態(tài)。
- 最新的LeaseId。
如果NN主動(dòng)要求DN進(jìn)行塊上報,會(huì )在心跳里面觸發(fā)塊上報。
發(fā)送心跳的函數是sendHeartBeat,
