in samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java [108:213]
static int run(
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
String jobName,
String jobId,
String containerId,
Optional<String> executionEnvContainerId,
Optional<String> samzaEpochId,
JobModel jobModel,
Config config,
Optional<ExternalContext> externalContextOptional) {
CoordinatorStreamStore coordinatorStreamStore = buildCoordinatorStreamStore(config, new MetricsRegistryMap());
coordinatorStreamStore.init();
/*
* We track the exit code and only trigger exit in the finally block to make sure we are able to execute all the
* clean up steps. Prior implementation had short circuited exit causing some of the clean up steps to be missed.
*/
int exitCode = 0;
try {
TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
LocalityManager localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetContainerHostMapping.TYPE));
// StartpointManager wraps the coordinatorStreamStore in the namespaces internally
StartpointManager startpointManager = null;
if (new JobConfig(config).getStartpointEnabled()) {
startpointManager = new StartpointManager(coordinatorStreamStore);
}
Map<String, MetricsReporter> metricsReporters = loadMetricsReporters(appDesc, containerId, config);
// Creating diagnostics manager and reporter, and wiring it respectively
Optional<DiagnosticsManager> diagnosticsManager =
DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, executionEnvContainerId,
samzaEpochId, config);
MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
DrainMonitor drainMonitor = null;
JobConfig jobConfig = new JobConfig(config);
if (jobConfig.getDrainMonitorEnabled()) {
drainMonitor = new DrainMonitor(coordinatorStreamStore, config);
}
SamzaContainer container = SamzaContainer$.MODULE$.apply(
containerId, jobModel,
ScalaJavaUtil.toScalaMap(metricsReporters),
metricsRegistryMap,
taskFactory,
JobContextImpl.fromConfigWithDefaults(config, jobModel),
Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
Option.apply(externalContextOptional.orElse(null)),
localityManager,
startpointManager,
Option.apply(diagnosticsManager.orElse(null)),
drainMonitor);
ProcessorLifecycleListener processorLifecycleListener = appDesc.getProcessorLifecycleListenerFactory()
.createInstance(new ProcessorContext() { }, config);
ClusterBasedProcessorLifecycleListener
listener = new ClusterBasedProcessorLifecycleListener(config, processorLifecycleListener, container::shutdown);
container.setContainerListener(listener);
ContainerHeartbeatMonitor heartbeatMonitor = createContainerHeartbeatMonitor(container,
new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE), config);
if (heartbeatMonitor != null) {
heartbeatMonitor.start();
}
if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
executionEnvContainerId.ifPresent(execEnvContainerId -> {
ExecutionContainerIdManager executionContainerIdManager = new ExecutionContainerIdManager(
new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetExecutionEnvContainerIdMapping.TYPE));
executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(containerId, execEnvContainerId);
});
}
container.run();
if (heartbeatMonitor != null) {
heartbeatMonitor.stop();
}
// Check to see if the HeartbeatMonitor has set an exception before
// overriding the value with what the listener returns
if (containerRunnerException == null) {
containerRunnerException = listener.getContainerException();
}
if (containerRunnerException != null) {
log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
exitCode = 1;
}
} catch (Throwable e) {
/*
* Two separate log statements are intended to print the entire stack trace as part of the logs. Using
* single log statement with custom format requires explicitly fetching stack trace and null checks which makes
* the code slightly hard to read in comparison with the current choice.
*/
log.error("Exiting the process due to", e);
log.error("Container runner exception: ", containerRunnerException);
exitCode = 1;
} finally {
coordinatorStreamStore.close();
return exitCode;
}
}