samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java [157:184]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void validateJmxMetrics() throws Exception {
    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
    CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(config, metricsRegistry);
    coordinatorStreamStore.init();
    try {
      LocalityManager localityManager =
          new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE));
      validator.init(config);
      LocalityModel localityModel = localityManager.readLocality();

      for (ProcessorLocality processorLocality : localityModel.getProcessorLocalities().values()) {
        String containerId = processorLocality.id();
        String jmxUrl = processorLocality.jmxTunnelingUrl();
        if (StringUtils.isNotBlank(jmxUrl)) {
          log.info("validate container " + containerId + " metrics with JMX: " + jmxUrl);
          JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
          jmxMetrics.connect();
          validator.validate(jmxMetrics);
          jmxMetrics.close();
          log.info("validate container " + containerId + " successfully");
        }
      }

      validator.complete();
    } finally {
      coordinatorStreamStore.close();
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



samza-yarn3/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java [157:184]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public void validateJmxMetrics() throws Exception {
    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
    CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(config, metricsRegistry);
    coordinatorStreamStore.init();
    try {
      LocalityManager localityManager =
          new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE));
      validator.init(config);
      LocalityModel localityModel = localityManager.readLocality();

      for (ProcessorLocality processorLocality : localityModel.getProcessorLocalities().values()) {
        String containerId = processorLocality.id();
        String jmxUrl = processorLocality.jmxTunnelingUrl();
        if (StringUtils.isNotBlank(jmxUrl)) {
          log.info("validate container " + containerId + " metrics with JMX: " + jmxUrl);
          JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
          jmxMetrics.connect();
          validator.validate(jmxMetrics);
          jmxMetrics.close();
          log.info("validate container " + containerId + " successfully");
        }
      }

      validator.complete();
    } finally {
      coordinatorStreamStore.close();
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



