def getJob()

in samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala [51:158]


  def getJob(submissionConfig: Config): StreamJob = {
    info("Creating a ThreadJob, which is only meant for debugging.")
    var config = submissionConfig

    val metricsRegistry = new MetricsRegistryMap()
    val coordinatorStreamStore: CoordinatorStreamStore = new CoordinatorStreamStore(config, metricsRegistry)
    coordinatorStreamStore.init()

    val changelogStreamManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetChangelogMapping.TYPE))

    val jobModelManager = JobModelManager(config, changelogStreamManager.readPartitionMapping(),
      coordinatorStreamStore, metricsRegistry)
    val jobModel = jobModelManager.jobModel

    val taskPartitionMappings: mutable.Map[TaskName, Integer] = mutable.Map[TaskName, Integer]()
    for (containerModel <- jobModel.getContainers.values) {
      for (taskModel <- containerModel.getTasks.values) {
        taskPartitionMappings.put(taskModel.getTaskName, taskModel.getChangelogPartition.getPartitionId)
      }
    }

    changelogStreamManager.writePartitionMapping(taskPartitionMappings)

    //create necessary checkpoint and changelog streams
    val metadataResourceUtil = new MetadataResourceUtil(jobModel, metricsRegistry, config)
    metadataResourceUtil.createResources()

    val jobConfig = new JobConfig(config)

    if (jobConfig.getStartpointEnabled()) {
      // fan out the startpoints
      val startpointManager = new StartpointManager(coordinatorStreamStore)
      startpointManager.start()
      try {
        startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel))
      } finally {
        startpointManager.stop()
      }
    }
    var drainMonitor: DrainMonitor = null
    if (jobConfig.getDrainMonitorEnabled()) {
      drainMonitor = new DrainMonitor(coordinatorStreamStore, config, jobConfig.getDrainMonitorPollIntervalMillis)
    }

    val containerId = "0"
    var jmxServer: JmxServer = null
    if (jobConfig.getJMXEnabled) {
      jmxServer = new JmxServer()
    }

    val appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config)
    val taskFactory: TaskFactory[_] = TaskFactoryUtil.getTaskFactory(appDesc)

    // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job.
    JavaOptionals.toRichOptional(new ShellCommandConfig(config).getTaskOpts).toOption match {
      case Some(taskOpts) => warn("%s was specified in config, but is not being used because job is being executed with ThreadJob. " +
        "You probably want to run %s=%s." format(ShellCommandConfig.TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS,
        classOf[ProcessJobFactory].getName))
      case _ => None
    }

    val containerListener = {
      val processorLifecycleListener = appDesc.getProcessorLifecycleListenerFactory().createInstance(new ProcessorContext() {}, config)
      new SamzaContainerListener {
        override def afterFailure(t: Throwable): Unit = {
          processorLifecycleListener.afterFailure(t)
          throw t
        }

        override def afterStart(): Unit = {
          processorLifecycleListener.afterStart()
        }

        override def afterStop(): Unit = {
          processorLifecycleListener.afterStop()
        }

        override def beforeStart(): Unit = {
          processorLifecycleListener.beforeStart()
        }

      }
    }

    try {
      jobModelManager.start
      val container = SamzaContainer(
        containerId,
        jobModel,
        Map[String, MetricsReporter](),
        metricsRegistry,
        taskFactory,
        JobContextImpl.fromConfigWithDefaults(config, jobModel),
        Option(appDesc.getApplicationContainerContextFactory.orElse(null)),
        Option(appDesc.getApplicationTaskContextFactory.orElse(null)),
        buildExternalContext(config),
        drainMonitor = drainMonitor)
      container.setContainerListener(containerListener)

      val threadJob = new ThreadJob(container)
      threadJob
    } finally {
      jobModelManager.stop
      if (jmxServer != null) {
        jmxServer.stop
      }
    }
  }