public NameNodeRpcServer()

in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java [278:565]


  public NameNodeRpcServer(Configuration conf, NameNode nn)
      throws IOException {
    this.nn = nn;
    this.namesystem = nn.getNamesystem();
    this.retryCache = namesystem.getRetryCache();
    this.metrics = NameNode.getNameNodeMetrics();

    int handlerCount = 
      conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY, 
                  DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
    ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS);

    RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
        ProtobufRpcEngine2.class);

    ClientNamenodeProtocolServerSideTranslatorPB 
       clientProtocolServerTranslator = 
         new ClientNamenodeProtocolServerSideTranslatorPB(this);
     BlockingService clientNNPbService = ClientNamenodeProtocol.
         newReflectiveBlockingService(clientProtocolServerTranslator);

    int maxDataLength = conf.getInt(IPC_MAXIMUM_DATA_LENGTH,
        IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
    DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator = 
        new DatanodeProtocolServerSideTranslatorPB(this, maxDataLength);
    BlockingService dnProtoPbService = DatanodeProtocolService
        .newReflectiveBlockingService(dnProtoPbTranslator);

    DatanodeLifelineProtocolServerSideTranslatorPB lifelineProtoPbTranslator =
        new DatanodeLifelineProtocolServerSideTranslatorPB(this);
    BlockingService lifelineProtoPbService = DatanodeLifelineProtocolService
        .newReflectiveBlockingService(lifelineProtoPbTranslator);

    NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = 
        new NamenodeProtocolServerSideTranslatorPB(this);
    BlockingService NNPbService = NamenodeProtocolService
          .newReflectiveBlockingService(namenodeProtocolXlator);
    
    RefreshAuthorizationPolicyProtocolServerSideTranslatorPB refreshAuthPolicyXlator = 
        new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this);
    BlockingService refreshAuthService = RefreshAuthorizationPolicyProtocolService
        .newReflectiveBlockingService(refreshAuthPolicyXlator);

    RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator = 
        new RefreshUserMappingsProtocolServerSideTranslatorPB(this);
    BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService
        .newReflectiveBlockingService(refreshUserMappingXlator);

    RefreshCallQueueProtocolServerSideTranslatorPB refreshCallQueueXlator = 
        new RefreshCallQueueProtocolServerSideTranslatorPB(this);
    BlockingService refreshCallQueueService = RefreshCallQueueProtocolService
        .newReflectiveBlockingService(refreshCallQueueXlator);

    GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator =
        new GenericRefreshProtocolServerSideTranslatorPB(this);
    BlockingService genericRefreshService = GenericRefreshProtocolService
        .newReflectiveBlockingService(genericRefreshXlator);

    GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = 
        new GetUserMappingsProtocolServerSideTranslatorPB(this);
    BlockingService getUserMappingService = GetUserMappingsProtocolService
        .newReflectiveBlockingService(getUserMappingXlator);
    
    HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator = 
        new HAServiceProtocolServerSideTranslatorPB(this);
    BlockingService haPbService = HAServiceProtocolService
        .newReflectiveBlockingService(haServiceProtocolXlator);

    ReconfigurationProtocolServerSideTranslatorPB reconfigurationProtocolXlator
        = new ReconfigurationProtocolServerSideTranslatorPB(this);
    BlockingService reconfigurationPbService = ReconfigurationProtocolService
        .newReflectiveBlockingService(reconfigurationProtocolXlator);

    InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
    if (serviceRpcAddr != null) {
      String bindHost = nn.getServiceRpcServerBindHost(conf);
      if (bindHost == null) {
        bindHost = serviceRpcAddr.getHostName();
      }
      LOG.info("Service RPC server is binding to " + bindHost + ":" +
          serviceRpcAddr.getPort());

      int serviceHandlerCount =
        conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                    DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
      serviceRpcServer = new RPC.Builder(conf)
          .setProtocol(
              org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
          .setInstance(clientNNPbService)
          .setBindAddress(bindHost)
          .setPort(serviceRpcAddr.getPort())
          .setNumHandlers(serviceHandlerCount)
          .setVerbose(false)
          .setSecretManager(namesystem.getDelegationTokenSecretManager())
          .build();

      // Add all the RPC protocols that the namenode implements
      DFSUtil.addInternalPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
          serviceRpcServer);
      DFSUtil.addInternalPBProtocol(conf, ReconfigurationProtocolPB.class,
          reconfigurationPbService, serviceRpcServer);
      DFSUtil.addInternalPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
          serviceRpcServer);
      DFSUtil.addInternalPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
          serviceRpcServer);
      DFSUtil.addInternalPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
          refreshAuthService, serviceRpcServer);
      DFSUtil.addInternalPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
          refreshUserMappingService, serviceRpcServer);
      // We support Refreshing call queue here in case the client RPC queue is full
      DFSUtil.addInternalPBProtocol(conf, RefreshCallQueueProtocolPB.class,
          refreshCallQueueService, serviceRpcServer);
      DFSUtil.addInternalPBProtocol(conf, GenericRefreshProtocolPB.class,
          genericRefreshService, serviceRpcServer);
      DFSUtil.addInternalPBProtocol(conf, GetUserMappingsProtocolPB.class,
          getUserMappingService, serviceRpcServer);

      // Update the address with the correct port
      InetSocketAddress listenAddr = serviceRpcServer.getListenerAddress();
      serviceRPCAddress = new InetSocketAddress(
            serviceRpcAddr.getHostName(), listenAddr.getPort());
      nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
    } else {
      serviceRpcServer = null;
      serviceRPCAddress = null;
    }

    InetSocketAddress lifelineRpcAddr = nn.getLifelineRpcServerAddress(conf);
    if (lifelineRpcAddr != null) {
      RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
          ProtobufRpcEngine2.class);
      String bindHost = nn.getLifelineRpcServerBindHost(conf);
      if (bindHost == null) {
        bindHost = lifelineRpcAddr.getHostName();
      }
      LOG.info("Lifeline RPC server is binding to {}:{}", bindHost,
          lifelineRpcAddr.getPort());

      int lifelineHandlerCount = conf.getInt(
          DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY, 0);
      if (lifelineHandlerCount <= 0) {
        float lifelineHandlerRatio = conf.getFloat(
            DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY,
            DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT);
        lifelineHandlerCount = Math.max(
            (int)(handlerCount * lifelineHandlerRatio), 1);
      }
      lifelineRpcServer = new RPC.Builder(conf)
          .setProtocol(HAServiceProtocolPB.class)
          .setInstance(haPbService)
          .setBindAddress(bindHost)
          .setPort(lifelineRpcAddr.getPort())
          .setNumHandlers(lifelineHandlerCount)
          .setVerbose(false)
          .setSecretManager(namesystem.getDelegationTokenSecretManager())
          .build();

      DFSUtil.addInternalPBProtocol(conf, DatanodeLifelineProtocolPB.class,
          lifelineProtoPbService, lifelineRpcServer);

      // Update the address with the correct port
      InetSocketAddress listenAddr = lifelineRpcServer.getListenerAddress();
      lifelineRPCAddress = new InetSocketAddress(lifelineRpcAddr.getHostName(),
          listenAddr.getPort());
      nn.setRpcLifelineServerAddress(conf, lifelineRPCAddress);
    } else {
      lifelineRpcServer = null;
      lifelineRPCAddress = null;
    }

    InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf);
    String bindHost = nn.getRpcServerBindHost(conf);
    if (bindHost == null) {
      bindHost = rpcAddr.getHostName();
    }
    LOG.info("RPC server is binding to " + bindHost + ":" + rpcAddr.getPort());

    boolean enableStateContext = conf.getBoolean(
        DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY,
        DFS_NAMENODE_STATE_CONTEXT_ENABLED_DEFAULT);
    LOG.info("Enable NameNode state context:" + enableStateContext);

    GlobalStateIdContext stateIdContext = null;
    if (enableStateContext) {
      stateIdContext = new GlobalStateIdContext(namesystem);
    }

    clientRpcServer = new RPC.Builder(conf)
        .setProtocol(
            org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
        .setInstance(clientNNPbService)
        .setBindAddress(bindHost)
        .setPort(rpcAddr.getPort())
        .setNumHandlers(handlerCount)
        .setVerbose(false)
        .setSecretManager(namesystem.getDelegationTokenSecretManager())
        .setAlignmentContext(stateIdContext)
        .build();

    // Add all the RPC protocols that the namenode implements
    DFSUtil.addInternalPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
        clientRpcServer);
    DFSUtil.addInternalPBProtocol(conf, ReconfigurationProtocolPB.class,
        reconfigurationPbService, clientRpcServer);
    DFSUtil.addInternalPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
        clientRpcServer);
    DFSUtil.addInternalPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
        clientRpcServer);
    DFSUtil.addInternalPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
        refreshAuthService, clientRpcServer);
    DFSUtil.addInternalPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
        refreshUserMappingService, clientRpcServer);
    DFSUtil.addInternalPBProtocol(conf, RefreshCallQueueProtocolPB.class,
        refreshCallQueueService, clientRpcServer);
    DFSUtil.addInternalPBProtocol(conf, GenericRefreshProtocolPB.class,
        genericRefreshService, clientRpcServer);
    DFSUtil.addInternalPBProtocol(conf, GetUserMappingsProtocolPB.class,
        getUserMappingService, clientRpcServer);

    // set service-level authorization security policy
    if (serviceAuthEnabled =
          conf.getBoolean(
            CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
      clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
      if (serviceRpcServer != null) {
        serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
      }
      if (lifelineRpcServer != null) {
        lifelineRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
      }
    }

    // The rpc-server port can be ephemeral... ensure we have the correct info
    InetSocketAddress listenAddr = clientRpcServer.getListenerAddress();
    clientRpcAddress = new InetSocketAddress(
        rpcAddr.getHostName(), listenAddr.getPort());
    nn.setRpcServerAddress(conf, clientRpcAddress);
    
    minimumDataNodeVersion = conf.get(
        DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,
        DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);

    defaultECPolicyName = conf.getTrimmed(
        DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY,
        DFSConfigKeys.DFS_NAMENODE_EC_SYSTEM_DEFAULT_POLICY_DEFAULT);

    // Set terse exception whose stack trace won't be logged
    clientRpcServer.addTerseExceptions(SafeModeException.class,
        FileNotFoundException.class,
        HadoopIllegalArgumentException.class,
        FileAlreadyExistsException.class,
        InvalidPathException.class,
        ParentNotDirectoryException.class,
        UnresolvedLinkException.class,
        AlreadyBeingCreatedException.class,
        QuotaExceededException.class,
        RecoveryInProgressException.class,
        AccessControlException.class,
        InvalidToken.class,
        LeaseExpiredException.class,
        NSQuotaExceededException.class,
        DSQuotaExceededException.class,
        QuotaByStorageTypeExceededException.class,
        AclException.class,
        FSLimitException.PathComponentTooLongException.class,
        FSLimitException.MaxDirectoryItemsExceededException.class,
        DisallowedDatanodeException.class,
        XAttrNotFoundException.class,
        PathIsNotEmptyDirectoryException.class);

    clientRpcServer.addSuppressedLoggingExceptions(StandbyException.class,
        UnresolvedPathException.class);

    clientRpcServer.setTracer(nn.tracer);
    if (serviceRpcServer != null) {
      serviceRpcServer.setTracer(nn.tracer);
    }
    if (lifelineRpcServer != null) {
      lifelineRpcServer.setTracer(nn.tracer);
    }
    int[] auxiliaryPorts =
        conf.getInts(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY);
    if (auxiliaryPorts != null && auxiliaryPorts.length != 0) {
      for (int auxiliaryPort : auxiliaryPorts) {
        this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
      }
    }
  }