in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java [278:565]
public NameNodeRpcServer(Configuration conf, NameNode nn)
throws IOException {
this.nn = nn;
this.namesystem = nn.getNamesystem();
this.retryCache = namesystem.getRetryCache();
this.metrics = NameNode.getNameNodeMetrics();
int handlerCount =
conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS);
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine2.class);
ClientNamenodeProtocolServerSideTranslatorPB
clientProtocolServerTranslator =
new ClientNamenodeProtocolServerSideTranslatorPB(this);
BlockingService clientNNPbService = ClientNamenodeProtocol.
newReflectiveBlockingService(clientProtocolServerTranslator);
int maxDataLength = conf.getInt(IPC_MAXIMUM_DATA_LENGTH,
IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =
new DatanodeProtocolServerSideTranslatorPB(this, maxDataLength);
BlockingService dnProtoPbService = DatanodeProtocolService
.newReflectiveBlockingService(dnProtoPbTranslator);
DatanodeLifelineProtocolServerSideTranslatorPB lifelineProtoPbTranslator =
new DatanodeLifelineProtocolServerSideTranslatorPB(this);
BlockingService lifelineProtoPbService = DatanodeLifelineProtocolService
.newReflectiveBlockingService(lifelineProtoPbTranslator);
NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
new NamenodeProtocolServerSideTranslatorPB(this);
BlockingService NNPbService = NamenodeProtocolService
.newReflectiveBlockingService(namenodeProtocolXlator);
RefreshAuthorizationPolicyProtocolServerSideTranslatorPB refreshAuthPolicyXlator =
new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this);
BlockingService refreshAuthService = RefreshAuthorizationPolicyProtocolService
.newReflectiveBlockingService(refreshAuthPolicyXlator);
RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator =
new RefreshUserMappingsProtocolServerSideTranslatorPB(this);
BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService
.newReflectiveBlockingService(refreshUserMappingXlator);
RefreshCallQueueProtocolServerSideTranslatorPB refreshCallQueueXlator =
new RefreshCallQueueProtocolServerSideTranslatorPB(this);
BlockingService refreshCallQueueService = RefreshCallQueueProtocolService
.newReflectiveBlockingService(refreshCallQueueXlator);
GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator =
new GenericRefreshProtocolServerSideTranslatorPB(this);
BlockingService genericRefreshService = GenericRefreshProtocolService
.newReflectiveBlockingService(genericRefreshXlator);
GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator =
new GetUserMappingsProtocolServerSideTranslatorPB(this);
BlockingService getUserMappingService = GetUserMappingsProtocolService
.newReflectiveBlockingService(getUserMappingXlator);
HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
new HAServiceProtocolServerSideTranslatorPB(this);
BlockingService haPbService = HAServiceProtocolService
.newReflectiveBlockingService(haServiceProtocolXlator);
ReconfigurationProtocolServerSideTranslatorPB reconfigurationProtocolXlator
= new ReconfigurationProtocolServerSideTranslatorPB(this);
BlockingService reconfigurationPbService = ReconfigurationProtocolService
.newReflectiveBlockingService(reconfigurationProtocolXlator);
InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
if (serviceRpcAddr != null) {
String bindHost = nn.getServiceRpcServerBindHost(conf);
if (bindHost == null) {
bindHost = serviceRpcAddr.getHostName();
}
LOG.info("Service RPC server is binding to " + bindHost + ":" +
serviceRpcAddr.getPort());
int serviceHandlerCount =
conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
serviceRpcServer = new RPC.Builder(conf)
.setProtocol(
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress(bindHost)
.setPort(serviceRpcAddr.getPort())
.setNumHandlers(serviceHandlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.build();
// Add all the RPC protocols that the namenode implements
DFSUtil.addInternalPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
serviceRpcServer);
DFSUtil.addInternalPBProtocol(conf, ReconfigurationProtocolPB.class,
reconfigurationPbService, serviceRpcServer);
DFSUtil.addInternalPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
serviceRpcServer);
DFSUtil.addInternalPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
serviceRpcServer);
DFSUtil.addInternalPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
refreshAuthService, serviceRpcServer);
DFSUtil.addInternalPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
refreshUserMappingService, serviceRpcServer);
// We support Refreshing call queue here in case the client RPC queue is full
DFSUtil.addInternalPBProtocol(conf, RefreshCallQueueProtocolPB.class,
refreshCallQueueService, serviceRpcServer);
DFSUtil.addInternalPBProtocol(conf, GenericRefreshProtocolPB.class,
genericRefreshService, serviceRpcServer);
DFSUtil.addInternalPBProtocol(conf, GetUserMappingsProtocolPB.class,
getUserMappingService, serviceRpcServer);
// Update the address with the correct port
InetSocketAddress listenAddr = serviceRpcServer.getListenerAddress();
serviceRPCAddress = new InetSocketAddress(
serviceRpcAddr.getHostName(), listenAddr.getPort());
nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
} else {
serviceRpcServer = null;
serviceRPCAddress = null;
}
InetSocketAddress lifelineRpcAddr = nn.getLifelineRpcServerAddress(conf);
if (lifelineRpcAddr != null) {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine2.class);
String bindHost = nn.getLifelineRpcServerBindHost(conf);
if (bindHost == null) {
bindHost = lifelineRpcAddr.getHostName();
}
LOG.info("Lifeline RPC server is binding to {}:{}", bindHost,
lifelineRpcAddr.getPort());
int lifelineHandlerCount = conf.getInt(
DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY, 0);
if (lifelineHandlerCount <= 0) {
float lifelineHandlerRatio = conf.getFloat(
DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY,
DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT);
lifelineHandlerCount = Math.max(
(int)(handlerCount * lifelineHandlerRatio), 1);
}
lifelineRpcServer = new RPC.Builder(conf)
.setProtocol(HAServiceProtocolPB.class)
.setInstance(haPbService)
.setBindAddress(bindHost)
.setPort(lifelineRpcAddr.getPort())
.setNumHandlers(lifelineHandlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.build();
DFSUtil.addInternalPBProtocol(conf, DatanodeLifelineProtocolPB.class,
lifelineProtoPbService, lifelineRpcServer);
// Update the address with the correct port
InetSocketAddress listenAddr = lifelineRpcServer.getListenerAddress();
lifelineRPCAddress = new InetSocketAddress(lifelineRpcAddr.getHostName(),
listenAddr.getPort());
nn.setRpcLifelineServerAddress(conf, lifelineRPCAddress);
} else {
lifelineRpcServer = null;
lifelineRPCAddress = null;
}
InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf);
String bindHost = nn.getRpcServerBindHost(conf);
if (bindHost == null) {
bindHost = rpcAddr.getHostName();
}
LOG.info("RPC server is binding to " + bindHost + ":" + rpcAddr.getPort());
boolean enableStateContext = conf.getBoolean(
DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY,
DFS_NAMENODE_STATE_CONTEXT_ENABLED_DEFAULT);
LOG.info("Enable NameNode state context:" + enableStateContext);
GlobalStateIdContext stateIdContext = null;
if (enableStateContext) {
stateIdContext = new GlobalStateIdContext(namesystem);
}
clientRpcServer = new RPC.Builder(conf)
.setProtocol(
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress(bindHost)
.setPort(rpcAddr.getPort())
.setNumHandlers(handlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.setAlignmentContext(stateIdContext)
.build();
// Add all the RPC protocols that the namenode implements
DFSUtil.addInternalPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
clientRpcServer);
DFSUtil.addInternalPBProtocol(conf, ReconfigurationProtocolPB.class,
reconfigurationPbService, clientRpcServer);
DFSUtil.addInternalPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
clientRpcServer);
DFSUtil.addInternalPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
clientRpcServer);
DFSUtil.addInternalPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
refreshAuthService, clientRpcServer);
DFSUtil.addInternalPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
refreshUserMappingService, clientRpcServer);
DFSUtil.addInternalPBProtocol(conf, RefreshCallQueueProtocolPB.class,
refreshCallQueueService, clientRpcServer);
DFSUtil.addInternalPBProtocol(conf, GenericRefreshProtocolPB.class,
genericRefreshService, clientRpcServer);
DFSUtil.addInternalPBProtocol(conf, GetUserMappingsProtocolPB.class,
getUserMappingService, clientRpcServer);
// set service-level authorization security policy
if (serviceAuthEnabled =
conf.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
if (serviceRpcServer != null) {
serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
}
if (lifelineRpcServer != null) {
lifelineRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
}
}
// The rpc-server port can be ephemeral... ensure we have the correct info
InetSocketAddress listenAddr = clientRpcServer.getListenerAddress();
clientRpcAddress = new InetSocketAddress(
rpcAddr.getHostName(), listenAddr.getPort());
nn.setRpcServerAddress(conf, clientRpcAddress);
minimumDataNodeVersion = conf.get(
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);
defaultECPolicyName = conf.getTrimmed(
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY,
DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY_DEFAULT);
// Set terse exception whose stack trace won't be logged
clientRpcServer.addTerseExceptions(SafeModeException.class,
FileNotFoundException.class,
HadoopIllegalArgumentException.class,
FileAlreadyExistsException.class,
InvalidPathException.class,
ParentNotDirectoryException.class,
UnresolvedLinkException.class,
AlreadyBeingCreatedException.class,
QuotaExceededException.class,
RecoveryInProgressException.class,
AccessControlException.class,
InvalidToken.class,
LeaseExpiredException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class,
QuotaByStorageTypeExceededException.class,
AclException.class,
FSLimitException.PathComponentTooLongException.class,
FSLimitException.MaxDirectoryItemsExceededException.class,
DisallowedDatanodeException.class,
XAttrNotFoundException.class,
PathIsNotEmptyDirectoryException.class);
clientRpcServer.addSuppressedLoggingExceptions(StandbyException.class,
UnresolvedPathException.class);
clientRpcServer.setTracer(nn.tracer);
if (serviceRpcServer != null) {
serviceRpcServer.setTracer(nn.tracer);
}
if (lifelineRpcServer != null) {
lifelineRpcServer.setTracer(nn.tracer);
}
int[] auxiliaryPorts =
conf.getInts(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY);
if (auxiliaryPorts != null && auxiliaryPorts.length != 0) {
for (int auxiliaryPort : auxiliaryPorts) {
this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
}
}
}