samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java [70:205]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class YarnJobValidationTool {
  private static final Logger log = LoggerFactory.getLogger(YarnJobValidationTool.class);

  private final JobConfig config;
  private final YarnClient client;
  private final String jobName;
  private final MetricsValidator validator;

  public YarnJobValidationTool(JobConfig config, YarnClient client, MetricsValidator validator) {
    this.config = config;
    this.client = client;
    String name = this.config.getName().get();
    String jobId = this.config.getJobId();
    this.jobName =  name + "_" + jobId;
    this.validator = validator;
  }

  public void run() {
    ApplicationId appId;
    ApplicationAttemptId attemptId;

    try {
      log.info("Start validating job " + this.jobName);

      appId = validateAppId();
      attemptId = validateRunningAttemptId(appId);
      validateContainerCount(attemptId);
      if (validator != null) {
        validateJmxMetrics();
      }

      log.info("End of validation");
    } catch (Exception e) {
      log.error(e.getMessage(), e);
      System.exit(1);
    }
  }

  public ApplicationId validateAppId() throws Exception {
    // fetch only the last created application with the job name and id
    // i.e. get the application with max appId
    ApplicationId appId = null;
    for (ApplicationReport applicationReport : this.client.getApplications()) {
      if (applicationReport.getName().equals(this.jobName)) {
        ApplicationId id = applicationReport.getApplicationId();
        if (appId == null || appId.compareTo(id) < 0) {
          appId = id;
        }
      }
    }
    if (appId != null) {
      log.info("Job lookup success. ApplicationId " + appId.toString());
      return appId;
    } else {
      throw new SamzaException("Job lookup failure " + this.jobName);
    }
  }

  public ApplicationAttemptId validateRunningAttemptId(ApplicationId appId) throws Exception {
    ApplicationAttemptId attemptId = this.client.getApplicationReport(appId).getCurrentApplicationAttemptId();
    ApplicationAttemptReport attemptReport = this.client.getApplicationAttemptReport(attemptId);
    if (attemptReport.getYarnApplicationAttemptState() == YarnApplicationAttemptState.RUNNING) {
      log.info("Job is running. AttempId " + attemptId.toString());
      return attemptId;
    } else {
      throw new SamzaException("Job not running " + this.jobName);
    }
  }

  public int validateContainerCount(ApplicationAttemptId attemptId) throws Exception {
    int runningContainerCount = 0;
    for (ContainerReport containerReport : this.client.getContainers(attemptId)) {
      if (containerReport.getContainerState() == ContainerState.RUNNING) {
        ++runningContainerCount;
      }
    }
    // expected containers to be the configured job containers plus the AppMaster container
    int containerExpected = this.config.getContainerCount() + 1;

    if (runningContainerCount == containerExpected) {
      log.info("Container count matches. " + runningContainerCount + " containers are running.");
      return runningContainerCount;
    } else {
      throw new SamzaException("Container count does not match. " + runningContainerCount + " containers are running, while " + containerExpected + " is expected.");
    }
  }

  public void validateJmxMetrics() throws Exception {
    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
    CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(config, metricsRegistry);
    coordinatorStreamStore.init();
    try {
      LocalityManager localityManager =
          new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE));
      validator.init(config);
      LocalityModel localityModel = localityManager.readLocality();

      for (ProcessorLocality processorLocality : localityModel.getProcessorLocalities().values()) {
        String containerId = processorLocality.id();
        String jmxUrl = processorLocality.jmxTunnelingUrl();
        if (StringUtils.isNotBlank(jmxUrl)) {
          log.info("validate container " + containerId + " metrics with JMX: " + jmxUrl);
          JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
          jmxMetrics.connect();
          validator.validate(jmxMetrics);
          jmxMetrics.close();
          log.info("validate container " + containerId + " successfully");
        }
      }

      validator.complete();
    } finally {
      coordinatorStreamStore.close();
    }
  }

  public static void main(String[] args) throws Exception {
    CommandLine cmdline = new CommandLine();
    OptionParser parser = cmdline.parser();
    OptionSpec<String> validatorOpt = parser.accepts("metrics-validator", "The metrics validator class.")
                                            .withOptionalArg()
                                            .ofType(String.class).describedAs("com.foo.bar.ClassName");
    OptionSet options = cmdline.parser().parse(args);
    Config config = cmdline.loadConfig(options);
    MetricsValidator validator = null;
    if (options.has(validatorOpt)) {
      String validatorClass = options.valueOf(validatorOpt);
      validator = ReflectionUtil.getObj(validatorClass, MetricsValidator.class);
    }

    YarnConfiguration hadoopConfig = new YarnConfiguration();
    hadoopConfig.set("fs.http.impl", HttpFileSystem.class.getName());
    hadoopConfig.set("fs.https.impl", HttpFileSystem.class.getName());
    ClientHelper clientHelper = new ClientHelper(hadoopConfig);

    new YarnJobValidationTool(new JobConfig(config), clientHelper.yarnClient(), validator).run();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



samza-yarn3/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java [70:205]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class YarnJobValidationTool {
  private static final Logger log = LoggerFactory.getLogger(YarnJobValidationTool.class);

  private final JobConfig config;
  private final YarnClient client;
  private final String jobName;
  private final MetricsValidator validator;

  public YarnJobValidationTool(JobConfig config, YarnClient client, MetricsValidator validator) {
    this.config = config;
    this.client = client;
    String name = this.config.getName().get();
    String jobId = this.config.getJobId();
    this.jobName =  name + "_" + jobId;
    this.validator = validator;
  }

  public void run() {
    ApplicationId appId;
    ApplicationAttemptId attemptId;

    try {
      log.info("Start validating job " + this.jobName);

      appId = validateAppId();
      attemptId = validateRunningAttemptId(appId);
      validateContainerCount(attemptId);
      if (validator != null) {
        validateJmxMetrics();
      }

      log.info("End of validation");
    } catch (Exception e) {
      log.error(e.getMessage(), e);
      System.exit(1);
    }
  }

  public ApplicationId validateAppId() throws Exception {
    // fetch only the last created application with the job name and id
    // i.e. get the application with max appId
    ApplicationId appId = null;
    for (ApplicationReport applicationReport : this.client.getApplications()) {
      if (applicationReport.getName().equals(this.jobName)) {
        ApplicationId id = applicationReport.getApplicationId();
        if (appId == null || appId.compareTo(id) < 0) {
          appId = id;
        }
      }
    }
    if (appId != null) {
      log.info("Job lookup success. ApplicationId " + appId.toString());
      return appId;
    } else {
      throw new SamzaException("Job lookup failure " + this.jobName);
    }
  }

  public ApplicationAttemptId validateRunningAttemptId(ApplicationId appId) throws Exception {
    ApplicationAttemptId attemptId = this.client.getApplicationReport(appId).getCurrentApplicationAttemptId();
    ApplicationAttemptReport attemptReport = this.client.getApplicationAttemptReport(attemptId);
    if (attemptReport.getYarnApplicationAttemptState() == YarnApplicationAttemptState.RUNNING) {
      log.info("Job is running. AttempId " + attemptId.toString());
      return attemptId;
    } else {
      throw new SamzaException("Job not running " + this.jobName);
    }
  }

  public int validateContainerCount(ApplicationAttemptId attemptId) throws Exception {
    int runningContainerCount = 0;
    for (ContainerReport containerReport : this.client.getContainers(attemptId)) {
      if (containerReport.getContainerState() == ContainerState.RUNNING) {
        ++runningContainerCount;
      }
    }
    // expected containers to be the configured job containers plus the AppMaster container
    int containerExpected = this.config.getContainerCount() + 1;

    if (runningContainerCount == containerExpected) {
      log.info("Container count matches. " + runningContainerCount + " containers are running.");
      return runningContainerCount;
    } else {
      throw new SamzaException("Container count does not match. " + runningContainerCount + " containers are running, while " + containerExpected + " is expected.");
    }
  }

  public void validateJmxMetrics() throws Exception {
    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
    CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(config, metricsRegistry);
    coordinatorStreamStore.init();
    try {
      LocalityManager localityManager =
          new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE));
      validator.init(config);
      LocalityModel localityModel = localityManager.readLocality();

      for (ProcessorLocality processorLocality : localityModel.getProcessorLocalities().values()) {
        String containerId = processorLocality.id();
        String jmxUrl = processorLocality.jmxTunnelingUrl();
        if (StringUtils.isNotBlank(jmxUrl)) {
          log.info("validate container " + containerId + " metrics with JMX: " + jmxUrl);
          JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
          jmxMetrics.connect();
          validator.validate(jmxMetrics);
          jmxMetrics.close();
          log.info("validate container " + containerId + " successfully");
        }
      }

      validator.complete();
    } finally {
      coordinatorStreamStore.close();
    }
  }

  public static void main(String[] args) throws Exception {
    CommandLine cmdline = new CommandLine();
    OptionParser parser = cmdline.parser();
    OptionSpec<String> validatorOpt = parser.accepts("metrics-validator", "The metrics validator class.")
                                            .withOptionalArg()
                                            .ofType(String.class).describedAs("com.foo.bar.ClassName");
    OptionSet options = cmdline.parser().parse(args);
    Config config = cmdline.loadConfig(options);
    MetricsValidator validator = null;
    if (options.has(validatorOpt)) {
      String validatorClass = options.valueOf(validatorOpt);
      validator = ReflectionUtil.getObj(validatorClass, MetricsValidator.class);
    }

    YarnConfiguration hadoopConfig = new YarnConfiguration();
    hadoopConfig.set("fs.http.impl", HttpFileSystem.class.getName());
    hadoopConfig.set("fs.https.impl", HttpFileSystem.class.getName());
    ClientHelper clientHelper = new ClientHelper(hadoopConfig);

    new YarnJobValidationTool(new JobConfig(config), clientHelper.yarnClient(), validator).run();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



