boolean checkStandbyConstraints()

in samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java [440:480]


  boolean checkStandbyConstraints(String containerIdToStart, String host) {
    List<String> containerIDsForStandbyConstraints = this.standbyContainerConstraints.get(containerIdToStart);

    // Check if any of these conflicting containers are running/launching on host
    for (String containerID : containerIDsForStandbyConstraints) {
      SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);

      // return false if a conflicting container is pending for launch on the host
      if (resource != null && isFaultDomainAwareStandbyEnabled
              && faultDomainManager.hasSameFaultDomains(host, resource.getHost())) {
        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this fault domain",
                containerIdToStart, host, containerID);
        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
          samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
        }
        return false;
      } else if (resource != null && resource.getHost().equals(host)) {
        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
                containerIdToStart, host, containerID);
        return false;
      }

      // return false if a conflicting container is running on the host
      resource = samzaApplicationState.runningProcessors.get(containerID);
      if (resource != null && isFaultDomainAwareStandbyEnabled
              && faultDomainManager.hasSameFaultDomains(host, resource.getHost())) {
        log.info("Container {} cannot be started on host {} because container {} is already running on this fault domain",
                containerIdToStart, host, containerID);
        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
          samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
        }
        return false;
      } else if (resource != null && resource.getHost().equals(host)) {
        log.info("Container {} cannot be started on host {} because container {} is already running on this host",
                containerIdToStart, host, containerID);
        return false;
      }
    }

    return true;
  }