protected void handleContainerCompletion()

in gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java [784:863]


  protected void handleContainerCompletion(ContainerStatus containerStatus) {
    ContainerInfo completedContainerInfo = this.containerMap.remove(containerStatus.getContainerId());
    //Get the Helix instance name for the completed container. Because callbacks are processed asynchronously, we might
    //encounter situations where handleContainerCompletion() is called before onContainersAllocated(), resulting in the
    //containerId missing from the containersMap.
    // We use removedContainerID to remember these containers and remove them from containerMap later when we call requestTargetNumberOfContainers method
    if (completedContainerInfo == null) {
      removedContainerID.putIfAbsent(containerStatus.getContainerId(), "");
    }
    String completedInstanceName = completedContainerInfo == null?  UNKNOWN_HELIX_INSTANCE : completedContainerInfo.getHelixParticipantId();

    String helixTag = completedContainerInfo == null ? helixInstanceTags : completedContainerInfo.getHelixTag();
    if (completedContainerInfo != null) {
      allocatedContainerCountMap.get(helixTag).decrementAndGet();
    }

    LOGGER.info(String.format("Container %s running Helix instance %s with tag %s has completed with exit status %d",
        containerStatus.getContainerId(), completedInstanceName, helixTag, containerStatus.getExitStatus()));

    if (!Strings.isNullOrEmpty(containerStatus.getDiagnostics())) {
      LOGGER.info(String.format("Received the following diagnostics information for container %s: %s",
          containerStatus.getContainerId(), containerStatus.getDiagnostics()));
    }

    switch(containerStatus.getExitStatus()) {
      case(ContainerExitStatus.ABORTED):
        if (handleAbortedContainer(containerStatus, completedContainerInfo, completedInstanceName)) {
          return;
        }
        break;
      case(1): // Same as linux exit status 1 Often occurs when launch_container.sh failed
        LOGGER.info("Exit status 1. CompletedContainerInfo={}", completedContainerInfo);
        break;
      default:
        break;
    }

    if (this.shutdownInProgress) {
      return;
    }
    if(completedContainerInfo != null) {
      this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new AtomicInteger(0));
      int retryCount = this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet();

      // Populate event metadata
      Optional<ImmutableMap.Builder<String, String>> eventMetadataBuilder = Optional.absent();
      if (this.eventSubmitter.isPresent()) {
        eventMetadataBuilder = Optional.of(buildContainerStatusEventMetadata(containerStatus));
        eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.HELIX_INSTANCE_ID, completedInstanceName);
        eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_RETRY_ATTEMPT, retryCount + "");
      }

      if (this.helixInstanceMaxRetries > 0 && retryCount > this.helixInstanceMaxRetries) {
        if (this.eventSubmitter.isPresent()) {
          this.eventSubmitter.get()
              .submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, eventMetadataBuilder.get().build());
        }

        LOGGER.warn("Maximum number of retries has been achieved for Helix instance " + completedInstanceName);
        return;
      }

      // Add the Helix instance name of the completed container to the set of unused
      // instance names so they can be reused by a replacement container.
      LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName);
      this.unusedHelixInstanceNames.add(completedInstanceName);

      if (this.eventSubmitter.isPresent()) {
        this.eventSubmitter.get()
            .submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, eventMetadataBuilder.get().build());
      }
    }
    Optional<Resource> newContainerResource = completedContainerInfo != null ?
        Optional.of(completedContainerInfo.getContainer().getResource()) : Optional.absent();
    LOGGER.info("Requesting a new container to replace {} to run Helix instance {} with helix tag {} and resource {}",
        containerStatus.getContainerId(), completedInstanceName, helixTag, newContainerResource.orNull());
    this.eventBus.post(new NewContainerRequest(
        shouldStickToTheSameNode(containerStatus.getExitStatus()) && completedContainerInfo != null ?
            Optional.of(completedContainerInfo.getContainer()) : Optional.absent(), newContainerResource));
  }