public void run()

in samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java [247:347]


  public void run() {
    if (!isStarted.compareAndSet(false, true)) {
      LOG.warn("Attempting to start an already started job coordinator. ");
      return;
    }
    // set up JmxServer (if jmx is enabled)
    if (isJmxEnabled) {
      jmxServer = new JmxServer();
      state.jmxUrl = jmxServer.getJmxUrl();
      state.jmxTunnelingUrl = jmxServer.getTunnelingJmxUrl();
    } else {
      jmxServer = null;
    }

    try {
      // initialize JobCoordinator state
      LOG.info("Starting cluster based job coordinator");

      // write the diagnostics metadata file
      String jobName = new JobConfig(config).getName().get();
      String jobId = new JobConfig(config).getJobId();
      Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv("CONTAINER_ID"));
      DiagnosticsUtil.writeMetadataFile(jobName, jobId, METRICS_SOURCE_NAME, execEnvContainerId, config);

      //create necessary checkpoint and changelog streams, if not created
      JobModel jobModel = jobModelManager.jobModel();
      MetadataResourceUtil metadataResourceUtil = new MetadataResourceUtil(jobModel, this.metrics, config);
      metadataResourceUtil.createResources();

      // create all the resources required for state backend factories
      StorageConfig storageConfig = new StorageConfig(config);
      storageConfig.getBackupFactories().forEach(stateStorageBackendBackupFactory -> {
        StateBackendFactory stateBackendFactory =
            ReflectionUtil.getObj(stateStorageBackendBackupFactory, StateBackendFactory.class);
        StateBackendAdmin stateBackendAdmin = stateBackendFactory.getAdmin(jobModel, config);
        // Create resources required for state backend admin
        stateBackendAdmin.createResources();
        // Validate resources required for state backend admin
        stateBackendAdmin.validateResources();
      });

      /*
       * We fanout startpoint if and only if
       *  1. Startpoint is enabled in configuration
       *  2. If AM HA is enabled, fanout only if startpoint enabled and job coordinator metadata changed
       */
      if (shouldFanoutStartpoint()) {
        StartpointManager startpointManager = createStartpointManager();
        startpointManager.start();
        try {
          startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
        } finally {
          startpointManager.stop();
        }
      }

      // Remap changelog partitions to tasks
      Map<TaskName, Integer> prevPartitionMappings = changelogStreamManager.readPartitionMapping();

      Map<TaskName, Integer> taskPartitionMappings = new HashMap<>();
      Map<String, ContainerModel> containers = jobModel.getContainers();
      for (ContainerModel containerModel : containers.values()) {
        for (TaskModel taskModel : containerModel.getTasks().values()) {
          taskPartitionMappings.put(taskModel.getTaskName(), taskModel.getChangelogPartition().getPartitionId());
        }
      }

      changelogStreamManager.updatePartitionMapping(prevPartitionMappings, taskPartitionMappings);

      containerProcessManager.start();
      systemAdmins.start();
      partitionMonitor.start();
      inputStreamRegexMonitor.ifPresent(StreamRegexMonitor::start);

      // containerPlacementRequestAllocator thread has to start after the cpm is started
      LOG.info("Starting the container placement handler thread");
      containerPlacementMetadataStore.start();
      containerPlacementRequestAllocatorThread.start();

      boolean isInterrupted = false;

      while (!containerProcessManager.shouldShutdown()
          && !checkAndThrowException()
          && !isInterrupted
          && checkcontainerPlacementRequestAllocatorThreadIsAlive()) {
        try {
          Thread.sleep(jobCoordinatorSleepInterval);
        } catch (InterruptedException e) {
          isInterrupted = true;
          LOG.error("Interrupted in job coordinator loop", e);
          Thread.currentThread().interrupt();
        }
      }
    } catch (Throwable e) {
      LOG.error("Exception thrown in the JobCoordinator loop", e);
      coordinatorRunException = e;
      throw new SamzaException(e);
    } finally {
      onShutDown();
    }
  }