static int run()

in samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java [108:213]


  static int run(
      ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
      String jobName,
      String jobId,
      String containerId,
      Optional<String> executionEnvContainerId,
      Optional<String> samzaEpochId,
      JobModel jobModel,
      Config config,
      Optional<ExternalContext> externalContextOptional) {
    CoordinatorStreamStore coordinatorStreamStore = buildCoordinatorStreamStore(config, new MetricsRegistryMap());
    coordinatorStreamStore.init();

    /*
     * We track the exit code and only trigger exit in the finally block to make sure we are able to execute all the
     * clean up steps. Prior implementation had short circuited exit causing some of the clean up steps to be missed.
     */
    int exitCode = 0;

    try {
      TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
      LocalityManager localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetContainerHostMapping.TYPE));

      // StartpointManager wraps the coordinatorStreamStore in the namespaces internally
      StartpointManager startpointManager = null;
      if (new JobConfig(config).getStartpointEnabled()) {
        startpointManager = new StartpointManager(coordinatorStreamStore);
      }

      Map<String, MetricsReporter> metricsReporters = loadMetricsReporters(appDesc, containerId, config);

      // Creating diagnostics manager and reporter, and wiring it respectively
      Optional<DiagnosticsManager> diagnosticsManager =
          DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, executionEnvContainerId,
              samzaEpochId, config);
      MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();

      DrainMonitor drainMonitor = null;
      JobConfig jobConfig = new JobConfig(config);
      if (jobConfig.getDrainMonitorEnabled()) {
        drainMonitor = new DrainMonitor(coordinatorStreamStore, config);
      }

      SamzaContainer container = SamzaContainer$.MODULE$.apply(
          containerId, jobModel,
          ScalaJavaUtil.toScalaMap(metricsReporters),
          metricsRegistryMap,
          taskFactory,
          JobContextImpl.fromConfigWithDefaults(config, jobModel),
          Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
          Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
          Option.apply(externalContextOptional.orElse(null)),
          localityManager,
          startpointManager,
          Option.apply(diagnosticsManager.orElse(null)),
          drainMonitor);

      ProcessorLifecycleListener processorLifecycleListener = appDesc.getProcessorLifecycleListenerFactory()
          .createInstance(new ProcessorContext() { }, config);
      ClusterBasedProcessorLifecycleListener
          listener = new ClusterBasedProcessorLifecycleListener(config, processorLifecycleListener, container::shutdown);
      container.setContainerListener(listener);

      ContainerHeartbeatMonitor heartbeatMonitor = createContainerHeartbeatMonitor(container,
          new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE), config);
      if (heartbeatMonitor != null) {
        heartbeatMonitor.start();
      }

      if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
        executionEnvContainerId.ifPresent(execEnvContainerId -> {
          ExecutionContainerIdManager executionContainerIdManager = new ExecutionContainerIdManager(
              new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetExecutionEnvContainerIdMapping.TYPE));
          executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(containerId, execEnvContainerId);
        });
      }

      container.run();
      if (heartbeatMonitor != null) {
        heartbeatMonitor.stop();
      }

      // Check to see if the HeartbeatMonitor has set an exception before
      // overriding the value with what the listener returns
      if (containerRunnerException == null) {
        containerRunnerException = listener.getContainerException();
      }

      if (containerRunnerException != null) {
        log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
        exitCode = 1;
      }
    } catch (Throwable e) {
      /*
       * Two separate log statements are intended to print the entire stack trace as part of the logs. Using
       * single log statement with custom format requires explicitly fetching stack trace and null checks which makes
       * the code slightly hard to read in comparison with the current choice.
       */
      log.error("Exiting the process due to", e);
      log.error("Container runner exception: ", containerRunnerException);
      exitCode = 1;
    } finally {
      coordinatorStreamStore.close();
      return exitCode;
    }
  }