in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/BatchRpcUriRebalancer.java [70:132]
  public void computeJobState(
      final Map<String, RebalancingJobGroup> jobGroups, final Map<Long, StoredWorker> workers)
      throws Exception {
    for (RebalancingJobGroup rebalancingJobGroup : jobGroups.values()) {
      // although the job group will be eventually canceled, we check start and end timestamps here
      // to make sure it happens early.
      KafkaConsumerTaskGroup kafkaConsumerTaskGroup =
          rebalancingJobGroup.getJobGroup().getKafkaConsumerTaskGroup();
      if (kafkaConsumerTaskGroup
          .getStartTimestamp()
          .equals(kafkaConsumerTaskGroup.getEndTimestamp())) {
        rebalancingJobGroup.updateJobGroupState(JobState.JOB_STATE_CANCELED);
      }
      JobState jobGroupState = rebalancingJobGroup.getJobGroupState();
      Map<Long, StoredJobStatus> jobStatusMap = rebalancingJobGroup.getJobStatusMap();
      boolean allJobsCanceled = true;
      for (Map.Entry<Long, StoredJob> jobEntry : rebalancingJobGroup.getJobs().entrySet()) {
        long jobId = jobEntry.getKey();
        StoredJob job = jobEntry.getValue();
        StoredJobStatus jobStatus = jobStatusMap.get(jobId);
        // One of the following (in priority order) can apply:
        // 1. Cancel the job if it has reached the end offset and it is not yet canceled, or
        //    the job is a purge request(start == end). We have to cancel existing merge to ensure
        //    that a purge can succeed in the OFFSET_COMMIT_SKEW_MS time window.
        // 2. Propagate job group state to job state only if job state is not canceled.
        //    In particular, a multiple jobs within a job group may enters CANCELED state
        //    at different times when each reaches its own end offset, so we don't want to
        //    re-run those jobs.
        if (job.getState() != JobState.JOB_STATE_CANCELED
            && jobStatus != null
            && (job.getJob().getKafkaConsumerTask().getEndOffset()
                <= jobStatus.getJobStatus().getKafkaConsumerTaskStatus().getCommitOffset())) {
          // 1. Cancel the job if it has reached the end offset and it is not yet canceled.
          job = job.toBuilder().setState(JobState.JOB_STATE_CANCELED).build();
          rebalancingJobGroup.updateJob(jobId, job);
        } else if (job.getState() != JobState.JOB_STATE_CANCELED
            && job.getState() != jobGroupState) {
          // 2. propagate job group state to job state only if job state is not CANCELED b/c
          // job state CANCELED means it reached the end offset but is waiting for other partitions
          // to reach end offset.
          rebalancingJobGroup.updateJob(jobId, job.toBuilder().setState(jobGroupState).build());
        }
        // check job state after job state change.
        allJobsCanceled &= job.getState() == JobState.JOB_STATE_CANCELED;
      }
      // If all jobs are canceled but the job group is not canceled, cancel the job group.
      if (
      // when creating new job group, it will be init without jobs; we should not cancel in that
      // case.
      rebalancingJobGroup.getJobs().size() > 0
          &&
          // only cancel job group if all jobs are canceled.
          allJobsCanceled
          &&
          // don't recancel jobs that are already canceled.
          jobGroupState != JobState.JOB_STATE_CANCELED) {
        rebalancingJobGroup.updateJobGroupState(JobState.JOB_STATE_CANCELED);
      }
    }
  }