private void doOnProcessorChange()

in samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java [322:395]


  private void doOnProcessorChange(List<String> currentProcessorIds) {
    // if list of processors is empty - it means we are called from 'onBecomeLeader'

    // Check if number of processors is greater than number of tasks
    List<String> initialProcessorIds = new ArrayList<>(currentProcessorIds);
    int numTasks = getMaxNumTasks();
    if (currentProcessorIds.size() > numTasks) {
      int iterator = 0;
      while (currentProcessorIds.size() != numTasks) {
        if (!currentProcessorIds.get(iterator).equals(processorId)) {
          currentProcessorIds.remove(iterator);
          iterator++;
        }
      }
    }
    LOG.info("currentProcessorIds = {}", currentProcessorIds);
    LOG.info("initialProcessorIds = {}", initialProcessorIds);

    String nextJMVersion;
    String prevJMVersion = currentJMVersion.get();
    JobModel prevJobModel = jobModel;
    AtomicBoolean barrierTimeout = new AtomicBoolean(false);

    if (currentProcessorIds.isEmpty()) {
      if (currentJMVersion.get().equals(INITIAL_STATE)) {
        nextJMVersion = "1";
      } else {
        nextJMVersion = Integer.toString(Integer.valueOf(prevJMVersion) + 1);
      }
      currentProcessorIds = new ArrayList<>(table.getActiveProcessorsList(currentJMVersion));
      initialProcessorIds = currentProcessorIds;
    } else {
      //Check if previous barrier not reached, then previous barrier times out.
      String blobJMV = leaderBlob.getJobModelVersion();
      nextJMVersion = Integer.toString(Integer.valueOf(prevJMVersion) + 1);
      if (blobJMV != null && Integer.valueOf(blobJMV) > Integer.valueOf(prevJMVersion)) {
        prevJMVersion = blobJMV;
        prevJobModel = leaderBlob.getJobModel();
        nextJMVersion = Integer.toString(Integer.valueOf(blobJMV) + 1);
        versionUpgradeDetected.getAndSet(false);
        leaderBarrierScheduler.shutdown();
        leaderBlob.publishBarrierState(BarrierState.TIMEOUT.name() + " " + blobJMV, azureLeaderElector.getLeaseId().get());
      }
    }

    // Generate the new JobModel
    GrouperMetadata grouperMetadata = new GrouperMetadataImpl(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
    JobModel newJobModel =
        JobModelCalculator.INSTANCE.calculateJobModel(this.config, Collections.emptyMap(), streamMetadataCache,
            grouperMetadata);
    LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + nextJMVersion);

    // Publish the new job model
    boolean jmWrite = leaderBlob.publishJobModel(prevJobModel, newJobModel, prevJMVersion, nextJMVersion, azureLeaderElector.getLeaseId().get());
    // Publish barrier state
    boolean barrierWrite = leaderBlob.publishBarrierState(BarrierState.START.name() + " " + nextJMVersion, azureLeaderElector.getLeaseId().get());
    barrierTimeout.set(false);
    // Publish list of processors this function was called with
    boolean processorWrite = leaderBlob.publishLiveProcessorList(initialProcessorIds, azureLeaderElector.getLeaseId().get());

    //Shut down processor if write fails even after retries. These writes have an inherent retry policy.
    if (!jmWrite || !barrierWrite || !processorWrite) {
      LOG.info("Leader failed to publish the job model {}. Stopping the processor with PID: .", jobModel, processorId);
      stop();
    }

    LOG.info("pid=" + processorId + "Published new Job Model. Version = " + nextJMVersion);

    // Start scheduler to check if barrier reached
    long startTime = System.currentTimeMillis();
    leaderBarrierScheduler = new LeaderBarrierCompleteScheduler(errorHandler, table, nextJMVersion, initialProcessorIds, startTime, barrierTimeout, currentJMVersion, processorId);
    leaderBarrierScheduler.setStateChangeListener(createLeaderBarrierCompleteListener(nextJMVersion, barrierTimeout));
    leaderBarrierScheduler.scheduleTask();
  }