public RegisterNodeManagerResponse registerNodeManager()

in hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java [404:622]


  public RegisterNodeManagerResponse registerNodeManager(
      RegisterNodeManagerRequest request) throws YarnException,
      IOException {
    NodeId nodeId = request.getNodeId();
    String host = nodeId.getHost();
    int cmPort = nodeId.getPort();
    int httpPort = request.getHttpPort();
    Resource capability = request.getResource();
    String nodeManagerVersion = request.getNMVersion();
    Resource physicalResource = request.getPhysicalResource();
    NodeStatus nodeStatus = request.getNodeStatus();

    RegisterNodeManagerResponse response = recordFactory
        .newRecordInstance(RegisterNodeManagerResponse.class);

    if (!minimumNodeManagerVersion.equals("NONE")) {
      if (minimumNodeManagerVersion.equals("EqualToRM")) {
        minimumNodeManagerVersion = YarnVersionInfo.getVersion();
      }

      if ((nodeManagerVersion == null) ||
          (VersionUtil.compareVersions(nodeManagerVersion,minimumNodeManagerVersion)) < 0) {
        String message =
            "Disallowed NodeManager Version " + nodeManagerVersion
                + ", is less than the minimum version "
                + minimumNodeManagerVersion + " sending SHUTDOWN signal to "
                + "NodeManager.";
        LOG.info(message);
        response.setDiagnosticsMessage(message);
        response.setNodeAction(NodeAction.SHUTDOWN);
        return response;
      }
    }

    if (checkIpHostnameInRegistration) {
      InetSocketAddress nmAddress =
          NetUtils.createSocketAddrForHost(host, cmPort);
      InetAddress inetAddress = Server.getRemoteIp();
      if (inetAddress != null && nmAddress.isUnresolved()) {
        // Reject registration of unresolved nm to prevent resourcemanager
        // getting stuck at allocations.
        final String message =
            "hostname cannot be resolved (ip=" + inetAddress.getHostAddress()
                + ", hostname=" + host + ")";
        LOG.warn("Unresolved nodemanager registration: " + message);
        response.setDiagnosticsMessage(message);
        response.setNodeAction(NodeAction.SHUTDOWN);
        return response;
      }
    }

    // Check if this node is a 'valid' node
    if (!this.nodesListManager.isValidNode(host) &&
        !isNodeInDecommissioning(nodeId)) {
      String message =
          "Disallowed NodeManager from  " + host
              + ", Sending SHUTDOWN signal to the NodeManager.";
      LOG.info(message);
      response.setDiagnosticsMessage(message);
      response.setNodeAction(NodeAction.SHUTDOWN);
      return response;
    }

    // check if node's capacity is load from dynamic-resources.xml
    String nid = nodeId.toString();

    Resource dynamicLoadCapability = loadNodeResourceFromDRConfiguration(nid);
    if (dynamicLoadCapability != null) {
      LOG.debug("Resource for node: {} is adjusted from: {} to: {} due to"
          + " settings in dynamic-resources.xml.", nid, capability,
          dynamicLoadCapability);
      capability = dynamicLoadCapability;
      // sync back with new resource.
      response.setResource(capability);
    }

    // Check if this node has minimum allocations
    if (capability.getMemorySize() < minAllocMb
        || capability.getVirtualCores() < minAllocVcores) {
      String message = "NodeManager from  " + host
          + " doesn't satisfy minimum allocations, Sending SHUTDOWN"
          + " signal to the NodeManager. Node capabilities are " + capability
          + "; minimums are " + minAllocMb + "mb and " + minAllocVcores
          + " vcores";
      LOG.info(message);
      response.setDiagnosticsMessage(message);
      response.setNodeAction(NodeAction.SHUTDOWN);
      return response;
    }

    response.setContainerTokenMasterKey(containerTokenSecretManager
        .getCurrentKey());
    response.setNMTokenMasterKey(nmTokenSecretManager
        .getCurrentKey());

    RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
        resolve(host), capability, nodeManagerVersion, physicalResource);

    RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
    if (oldNode == null) {
      RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId,
          request.getNMContainerStatuses(),
          request.getRunningApplications(), nodeStatus);
      if (request.getLogAggregationReportsForApps() != null
          && !request.getLogAggregationReportsForApps().isEmpty()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Found the number of previous cached log aggregation "
              + "status from nodemanager:" + nodeId + " is :"
              + request.getLogAggregationReportsForApps().size());
        }
        startEvent.setLogAggregationReportsForApps(request
            .getLogAggregationReportsForApps());
      }
      this.rmContext.getDispatcher().getEventHandler().handle(
          startEvent);
    } else {
      LOG.info("Reconnect from the node at: " + host);
      this.nmLivelinessMonitor.unregister(nodeId);

      if (CollectionUtils.isEmpty(request.getRunningApplications())
          && rmNode.getState() != NodeState.DECOMMISSIONING
          && rmNode.getHttpPort() != oldNode.getHttpPort()) {
        // Reconnected node differs, so replace old node and start new node
        switch (rmNode.getState()) {
        case RUNNING:
          ClusterMetrics.getMetrics().decrNumActiveNodes();
          break;
        case UNHEALTHY:
          ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
          break;
        default:
          LOG.debug("Unexpected Rmnode state");
        }
        this.rmContext.getDispatcher().getEventHandler()
            .handle(new NodeRemovedSchedulerEvent(rmNode));

        this.rmContext.getRMNodes().put(nodeId, rmNode);
        this.rmContext.getDispatcher().getEventHandler()
            .handle(new RMNodeStartedEvent(nodeId, null, null, nodeStatus));
      } else {
        // Reset heartbeat ID since node just restarted.
        oldNode.resetLastNodeHeartBeatResponse();

        this.rmContext.getDispatcher().getEventHandler()
            .handle(new RMNodeReconnectEvent(nodeId, rmNode,
                request.getRunningApplications(),
                request.getNMContainerStatuses()));
      }
    }
    // On every node manager register we will be clearing NMToken keys if
    // present for any running application.
    this.nmTokenSecretManager.removeNodeKey(nodeId);
    this.nmLivelinessMonitor.register(nodeId);
    
    // Handle received container status, this should be processed after new
    // RMNode inserted
    if (!rmContext.isWorkPreservingRecoveryEnabled()) {
      if (!request.getNMContainerStatuses().isEmpty()) {
        LOG.info("received container statuses on node manager register :"
            + request.getNMContainerStatuses());
        for (NMContainerStatus status : request.getNMContainerStatuses()) {
          handleNMContainerStatus(status, nodeId);
        }
      }
    }

    // Update node's labels to RM's NodeLabelManager.
    Set<String> nodeLabels = NodeLabelsUtils.convertToStringSet(
        request.getNodeLabels());
    if (isDistributedNodeLabelsConf && nodeLabels != null) {
      try {
        updateNodeLabelsFromNMReport(nodeLabels, nodeId);
        response.setAreNodeLabelsAcceptedByRM(true);
      } catch (IOException ex) {
        // Ensure the exception is captured in the response
        response.setDiagnosticsMessage(ex.getMessage());
        response.setAreNodeLabelsAcceptedByRM(false);
      }
    } else if (isDelegatedCentralizedNodeLabelsConf) {
      this.rmContext.getRMDelegatedNodeLabelsUpdater().updateNodeLabels(nodeId);
    }

    // Update node's attributes to RM's NodeAttributesManager.
    if (request.getNodeAttributes() != null) {
      try {
        // update node attributes if necessary then update heartbeat response
        updateNodeAttributesIfNecessary(nodeId, request.getNodeAttributes());
        response.setAreNodeAttributesAcceptedByRM(true);
      } catch (IOException ex) {
        //ensure the error message is captured and sent across in response
        String errorMsg = response.getDiagnosticsMessage() == null ?
            ex.getMessage() :
            response.getDiagnosticsMessage() + "\n" + ex.getMessage();
        response.setDiagnosticsMessage(errorMsg);
        response.setAreNodeAttributesAcceptedByRM(false);
      }
    }

    StringBuilder message = new StringBuilder();
    message.append("NodeManager from node ").append(host).append("(cmPort: ")
        .append(cmPort).append(" httpPort: ");
    message.append(httpPort).append(") ")
        .append("registered with capability: ").append(capability);
    message.append(", assigned nodeId ").append(nodeId);
    if (response.getAreNodeLabelsAcceptedByRM()) {
      message.append(", node labels { ").append(
          StringUtils.join(",", nodeLabels) + " } ");
    }
    if (response.getAreNodeAttributesAcceptedByRM()) {
      message.append(", node attributes { ")
          .append(request.getNodeAttributes() + " } ");
    }

    LOG.info(message.toString());
    response.setNodeAction(NodeAction.NORMAL);
    response.setRMIdentifier(ResourceManager.getClusterTimeStamp());
    response.setRMVersion(YarnVersionInfo.getVersion());
    return response;
  }