samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java [46:129]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class YarnFaultDomainManager implements FaultDomainManager {

  private static final Logger log = LoggerFactory.getLogger(FaultDomainManager.class);
  private static final String FAULT_DOMAIN_MANAGER_GROUP = "yarn-fault-domain-manager";
  private static final String HOST_TO_FAULT_DOMAIN_CACHE_UPDATES = "host-to-fault-domain-cache-updates";
  private Multimap<String, FaultDomain> hostToRackMap;
  private final YarnClientImpl yarnClient;
  private Counter hostToFaultDomainCacheUpdates;

  public YarnFaultDomainManager(MetricsRegistry metricsRegistry) {
    this.yarnClient = new YarnClientImpl();
    yarnClient.init(new YarnConfiguration());
    yarnClient.start();
    this.hostToRackMap = computeHostToFaultDomainMap();
    hostToFaultDomainCacheUpdates = metricsRegistry.newCounter(FAULT_DOMAIN_MANAGER_GROUP, HOST_TO_FAULT_DOMAIN_CACHE_UPDATES);
  }

  @VisibleForTesting
  YarnFaultDomainManager(MetricsRegistry metricsRegistry, YarnClientImpl yarnClient, Multimap<String, FaultDomain> hostToRackMap) {
    this.yarnClient = yarnClient;
    yarnClient.init(new YarnConfiguration());
    yarnClient.start();
    this.hostToRackMap = hostToRackMap;
    hostToFaultDomainCacheUpdates = metricsRegistry.newCounter(FAULT_DOMAIN_MANAGER_GROUP, HOST_TO_FAULT_DOMAIN_CACHE_UPDATES);
  }

  /**
   * This method returns all the last cached rack values in a cluster, for all hosts that are healthy, up and running.
   * @return a set of {@link FaultDomain}s
   */
  @Override
  public Set<FaultDomain> getAllFaultDomains() {
    return new HashSet<>(hostToRackMap.values());
  }

  /**
   * This method returns the rack a particular host resides on based on the internal cache.
   * In case the rack of a host does not exist in this cache, we update the cache by computing the host to rack map again using Yarn.
   * @param host the host
   * @return the {@link FaultDomain}
   */
  @Override
  public Set<FaultDomain> getFaultDomainsForHost(String host) {
    if (!hostToRackMap.containsKey(host)) {
      hostToRackMap = computeHostToFaultDomainMap();
      hostToFaultDomainCacheUpdates.inc();
    }
    return new HashSet<>(hostToRackMap.get(host));
  }

  /**
   * This method checks if the two hostnames provided reside on the same rack.
   * @param host1 hostname
   * @param host2 hostname
   * @return true if the hosts exist on the same rack
   */
  @Override
  public boolean hasSameFaultDomains(String host1, String host2) {
    if (!hostToRackMap.keySet().contains(host1) || !hostToRackMap.keySet().contains(host2)) {
      hostToRackMap = computeHostToFaultDomainMap();
      hostToFaultDomainCacheUpdates.inc();
    }
    return hostToRackMap.get(host1).equals(hostToRackMap.get(host2));
  }

  /**
   * This method computes the host to rack map from Yarn.
   * Only the hosts that are running in the cluster will be a part of this map.
   * @return map of the host and the rack it resides on
   */
  @VisibleForTesting
  Multimap<String, FaultDomain> computeHostToFaultDomainMap() {
    Multimap<String, FaultDomain> hostToRackMap = HashMultimap.create();
    try {
      List<NodeReport> nodeReport = yarnClient.getNodeReports(NodeState.RUNNING);
      nodeReport.forEach(report -> {
        FaultDomain rack = new FaultDomain(FaultDomainType.RACK, report.getRackName());
        hostToRackMap.put(report.getNodeId().getHost(), rack);
      });
      log.info("Computed the host to rack map successfully from Yarn.");
    } catch (YarnException | IOException e) {
      throw new SamzaException("Yarn threw an exception while getting NodeReports.", e);
    }
    return hostToRackMap;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



samza-yarn3/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java [46:129]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class YarnFaultDomainManager implements FaultDomainManager {

  private static final Logger log = LoggerFactory.getLogger(FaultDomainManager.class);
  private static final String FAULT_DOMAIN_MANAGER_GROUP = "yarn-fault-domain-manager";
  private static final String HOST_TO_FAULT_DOMAIN_CACHE_UPDATES = "host-to-fault-domain-cache-updates";
  private Multimap<String, FaultDomain> hostToRackMap;
  private final YarnClientImpl yarnClient;
  private Counter hostToFaultDomainCacheUpdates;

  public YarnFaultDomainManager(MetricsRegistry metricsRegistry) {
    this.yarnClient = new YarnClientImpl();
    yarnClient.init(new YarnConfiguration());
    yarnClient.start();
    this.hostToRackMap = computeHostToFaultDomainMap();
    hostToFaultDomainCacheUpdates = metricsRegistry.newCounter(FAULT_DOMAIN_MANAGER_GROUP, HOST_TO_FAULT_DOMAIN_CACHE_UPDATES);
  }

  @VisibleForTesting
  YarnFaultDomainManager(MetricsRegistry metricsRegistry, YarnClientImpl yarnClient, Multimap<String, FaultDomain> hostToRackMap) {
    this.yarnClient = yarnClient;
    yarnClient.init(new YarnConfiguration());
    yarnClient.start();
    this.hostToRackMap = hostToRackMap;
    hostToFaultDomainCacheUpdates = metricsRegistry.newCounter(FAULT_DOMAIN_MANAGER_GROUP, HOST_TO_FAULT_DOMAIN_CACHE_UPDATES);
  }

  /**
   * This method returns all the last cached rack values in a cluster, for all hosts that are healthy, up and running.
   * @return a set of {@link FaultDomain}s
   */
  @Override
  public Set<FaultDomain> getAllFaultDomains() {
    return new HashSet<>(hostToRackMap.values());
  }

  /**
   * This method returns the rack a particular host resides on based on the internal cache.
   * In case the rack of a host does not exist in this cache, we update the cache by computing the host to rack map again using Yarn.
   * @param host the host
   * @return the {@link FaultDomain}
   */
  @Override
  public Set<FaultDomain> getFaultDomainsForHost(String host) {
    if (!hostToRackMap.containsKey(host)) {
      hostToRackMap = computeHostToFaultDomainMap();
      hostToFaultDomainCacheUpdates.inc();
    }
    return new HashSet<>(hostToRackMap.get(host));
  }

  /**
   * This method checks if the two hostnames provided reside on the same rack.
   * @param host1 hostname
   * @param host2 hostname
   * @return true if the hosts exist on the same rack
   */
  @Override
  public boolean hasSameFaultDomains(String host1, String host2) {
    if (!hostToRackMap.keySet().contains(host1) || !hostToRackMap.keySet().contains(host2)) {
      hostToRackMap = computeHostToFaultDomainMap();
      hostToFaultDomainCacheUpdates.inc();
    }
    return hostToRackMap.get(host1).equals(hostToRackMap.get(host2));
  }

  /**
   * This method computes the host to rack map from Yarn.
   * Only the hosts that are running in the cluster will be a part of this map.
   * @return map of the host and the rack it resides on
   */
  @VisibleForTesting
  Multimap<String, FaultDomain> computeHostToFaultDomainMap() {
    Multimap<String, FaultDomain> hostToRackMap = HashMultimap.create();
    try {
      List<NodeReport> nodeReport = yarnClient.getNodeReports(NodeState.RUNNING);
      nodeReport.forEach(report -> {
        FaultDomain rack = new FaultDomain(FaultDomainType.RACK, report.getRackName());
        hostToRackMap.put(report.getNodeId().getHost(), rack);
      });
      log.info("Computed the host to rack map successfully from Yarn.");
    } catch (YarnException | IOException e) {
      throw new SamzaException("Yarn threw an exception while getting NodeReports.", e);
    }
    return hostToRackMap;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



