webhdfs詳解
簡(jiǎn)介
hdfs提供了一種除了通過(guò)rpc的方式進(jìn)行文件操作的方式之外,還提供了http的方式對文件進(jìn)行操作的方式:webhdfs。支持HDFS 的完整FileSystem / FileContext接口。
其中Router和NameNode都支持了webhdfs的功能,具體實(shí)現有差別。
使用
文件系統URI與HTTP URL
WebHDFS的文件系統方案為“ webhdfs:// ”。WebHDFS文件系統URI具有以下格式。
webhdfs://<主機>:<HTTP_PORT>/<PATH>
上面的WebHDFS URI對應于下面的HDFS URI。
hdfs://<主機>:<RPC_PORT>/<PATH>
在REST API中,在路徑中插入前綴“ /webhdfs/v1 ”,并在末尾附加查詢(xún)。因此,相應的HTTP URL具有以下格式。
http://<主機>:<HTTP_PORT>/webhdfs/v1/<PATH>?op=create
詳細可以參考:https://hadoop.apache.org/docs/r3.4.1/hadoop-project-dist/hadoop-hdfs/WebHDFS.html
源碼實(shí)現分析
NameNode webhdfs 源碼實(shí)現分析
啟動(dòng)和初始化
在NameNode啟動(dòng)過(guò)程中,啟動(dòng)NameNode的http模塊的時(shí)候,啟動(dòng)了NameNode的webhdfs模塊。核心入口函數(NameNodeHttpServer.java):
void start() throws IOException {
//...
initWebHdfs(conf, bindAddress.getHostName(), httpKeytab, httpServer,
NamenodeWebHdfsMethods.class.getPackage().getName());
//...
}
從上面代碼可以看出webhdfs的核心處理類(lèi)為NamenodeWebHdfsMethods.java。當前類(lèi)是每個(gè)請求都是由一個(gè)NamenodeWebHdfsMethods對象處理的,在處理每個(gè)請求的時(shí)候,需要做下面的初始化:
public NamenodeWebHdfsMethods(@Context HttpServletRequest request) {
// the request object is a proxy to thread-locals so we have to extract
// what we want from it since the external call will be processed in a
// different thread.
scheme = request.getScheme();
userPrincipal = request.getUserPrincipal();
// get the remote address, if coming in via a trusted proxy server then
// the address with be that of the proxied client
remoteAddr = JspHelper.getRemoteAddr(request);
remotePort = JspHelper.getRemotePort(request);
supportEZ =
Boolean.valueOf(request.getHeader(WebHdfsFileSystem.EZ_HEADER));
}
主要獲取了當前登錄的用戶(hù)的相關(guān)信息,hdfs的nameService以及是否開(kāi)啟EC。
請求處理
NamenodeWebHdfsMethods里面定義的請求類(lèi)型主要是:
-
PUT請求:主要處理寫(xiě)入類(lèi)型的求情。
- CREATE操作。
- MKDIRS操作。
- CREATESYMLINK操作。
- RENAME操作。
- SETREPLICATION操作。
- SETOWNER操作。
- .....
-
DELETE請求:主要處理刪除類(lèi)的請求。主要包含:
- DELETE操作:
- DELETESNAPSHOT操作:
-
GET請求:主要處理查詢(xún)類(lèi)的請求。
-
POST請求:主要處理寫(xiě)入類(lèi)的請求。主要包含:
- APPEND操作。
- CONCAT操作。
- TRUNCATE操作。
- UNSETSTORAGEPOLICY操作。
- UNSETECPOLICY操作。
定義參考如下:
@GET
@Path("{" + UriFsPathParam.NAME + ":.*}")
@Produces({MediaType.APPLICATION_OCTET_STREAM + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8})
public Response get(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
//...
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, offset, length,
renewer, bufferSize, xattrEncoding, excludeDatanodes, fsAction,
snapshotName, oldSnapshotName, tokenKind, tokenService, startAfter);
return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException, URISyntaxException {
return get(ugi, delegation, username, doAsUser, path.getAbsolutePath(),
op, offset, length, renewer, bufferSize, xattrNames, xattrEncoding,
excludeDatanodes, fsAction, snapshotName, oldSnapshotName,
tokenKind, tokenService, noredirect, startAfter);
}
});
}
webhdfs 操作實(shí)現
1、 CREATE操作
當前操作主要是使用webhdfs上傳文件的操作,核心操作在DN和NN上面都有。在NN里面的操作主要是選擇合適的DN節點(diǎn),然后跳轉到DN上面進(jìn)行文件上傳。
核心處理流程如下:
NN部分源碼實(shí)現
入口位置代碼如下,在 redirectURI里面主要是選擇合適的DN,選擇合適DN的代碼在函數redirectURI里面的chooseDatanode函數,但最終還是有BlockManager提供結果。
case CREATE:
{
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final URI uri = redirectURI(null, namenode, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf),
exclDatanodes.getValue(), permission, unmaskedPermission,
overwrite, bufferSize, replication, blockSize, createParent,
createFlagParam);
if(!noredirectParam.getValue()) {
return Response.temporaryRedirect(uri)
.type(MediaType.APPLICATION_OCTET_STREAM).build();
} else {
final String js = JsonUtil.toJsonString("Location", uri);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
}
下面代碼是為上傳的文件選擇塊的邏輯的入口,選塊的邏輯最后還是有BlockManager完成,核心函數為chooseTarget4WebHDFS。
if (op == PutOpParam.Op.CREATE) {
//choose a datanode near to client
final DatanodeDescriptor clientNode = bm.getDatanodeManager(
).getDatanodeByHost(remoteAddr);
if (clientNode != null) {
final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
path, clientNode, excludes, blocksize);
if (storages.length > 0) {
return storages[0].getDatanodeDescriptor();
}
}
}
chooseTarget4WebHDF函數里面實(shí)現如下,由此可以看出webhdfs寫(xiě)入的是單副本。
/** Choose target for WebHDFS redirection. */
public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src,
DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
return placementPolicies.getPolicy(CONTIGUOUS).chooseTarget(src, 1,
clientnode, Collections.<DatanodeStorageInfo>emptyList(), false,
excludes, blocksize, storagePolicySuite.getDefaultPolicy(), null);
}
