in samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala [51:158]
def getJob(submissionConfig: Config): StreamJob = {
info("Creating a ThreadJob, which is only meant for debugging.")
var config = submissionConfig
val metricsRegistry = new MetricsRegistryMap()
val coordinatorStreamStore: CoordinatorStreamStore = new CoordinatorStreamStore(config, metricsRegistry)
coordinatorStreamStore.init()
val changelogStreamManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetChangelogMapping.TYPE))
val jobModelManager = JobModelManager(config, changelogStreamManager.readPartitionMapping(),
coordinatorStreamStore, metricsRegistry)
val jobModel = jobModelManager.jobModel
val taskPartitionMappings: mutable.Map[TaskName, Integer] = mutable.Map[TaskName, Integer]()
for (containerModel <- jobModel.getContainers.values) {
for (taskModel <- containerModel.getTasks.values) {
taskPartitionMappings.put(taskModel.getTaskName, taskModel.getChangelogPartition.getPartitionId)
}
}
changelogStreamManager.writePartitionMapping(taskPartitionMappings)
//create necessary checkpoint and changelog streams
val metadataResourceUtil = new MetadataResourceUtil(jobModel, metricsRegistry, config)
metadataResourceUtil.createResources()
val jobConfig = new JobConfig(config)
if (jobConfig.getStartpointEnabled()) {
// fan out the startpoints
val startpointManager = new StartpointManager(coordinatorStreamStore)
startpointManager.start()
try {
startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel))
} finally {
startpointManager.stop()
}
}
var drainMonitor: DrainMonitor = null
if (jobConfig.getDrainMonitorEnabled()) {
drainMonitor = new DrainMonitor(coordinatorStreamStore, config, jobConfig.getDrainMonitorPollIntervalMillis)
}
val containerId = "0"
var jmxServer: JmxServer = null
if (jobConfig.getJMXEnabled) {
jmxServer = new JmxServer()
}
val appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config)
val taskFactory: TaskFactory[_] = TaskFactoryUtil.getTaskFactory(appDesc)
// Give developers a nice friendly warning if they've specified task.opts and are using a threaded job.
JavaOptionals.toRichOptional(new ShellCommandConfig(config).getTaskOpts).toOption match {
case Some(taskOpts) => warn("%s was specified in config, but is not being used because job is being executed with ThreadJob. " +
"You probably want to run %s=%s." format(ShellCommandConfig.TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS,
classOf[ProcessJobFactory].getName))
case _ => None
}
val containerListener = {
val processorLifecycleListener = appDesc.getProcessorLifecycleListenerFactory().createInstance(new ProcessorContext() {}, config)
new SamzaContainerListener {
override def afterFailure(t: Throwable): Unit = {
processorLifecycleListener.afterFailure(t)
throw t
}
override def afterStart(): Unit = {
processorLifecycleListener.afterStart()
}
override def afterStop(): Unit = {
processorLifecycleListener.afterStop()
}
override def beforeStart(): Unit = {
processorLifecycleListener.beforeStart()
}
}
}
try {
jobModelManager.start
val container = SamzaContainer(
containerId,
jobModel,
Map[String, MetricsReporter](),
metricsRegistry,
taskFactory,
JobContextImpl.fromConfigWithDefaults(config, jobModel),
Option(appDesc.getApplicationContainerContextFactory.orElse(null)),
Option(appDesc.getApplicationTaskContextFactory.orElse(null)),
buildExternalContext(config),
drainMonitor = drainMonitor)
container.setContainerListener(containerListener)
val threadJob = new ThreadJob(container)
threadJob
} finally {
jobModelManager.stop
if (jmxServer != null) {
jmxServer.stop
}
}
}