in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/BatchRpcUriRebalancer.java [70:132]
public void computeJobState(
final Map<String, RebalancingJobGroup> jobGroups, final Map<Long, StoredWorker> workers)
throws Exception {
for (RebalancingJobGroup rebalancingJobGroup : jobGroups.values()) {
// although the job group will be eventually canceled, we check start and end timestamps here
// to make sure it happens early.
KafkaConsumerTaskGroup kafkaConsumerTaskGroup =
rebalancingJobGroup.getJobGroup().getKafkaConsumerTaskGroup();
if (kafkaConsumerTaskGroup
.getStartTimestamp()
.equals(kafkaConsumerTaskGroup.getEndTimestamp())) {
rebalancingJobGroup.updateJobGroupState(JobState.JOB_STATE_CANCELED);
}
JobState jobGroupState = rebalancingJobGroup.getJobGroupState();
Map<Long, StoredJobStatus> jobStatusMap = rebalancingJobGroup.getJobStatusMap();
boolean allJobsCanceled = true;
for (Map.Entry<Long, StoredJob> jobEntry : rebalancingJobGroup.getJobs().entrySet()) {
long jobId = jobEntry.getKey();
StoredJob job = jobEntry.getValue();
StoredJobStatus jobStatus = jobStatusMap.get(jobId);
// One of the following (in priority order) can apply:
// 1. Cancel the job if it has reached the end offset and it is not yet canceled, or
// the job is a purge request(start == end). We have to cancel existing merge to ensure
// that a purge can succeed in the OFFSET_COMMIT_SKEW_MS time window.
// 2. Propagate job group state to job state only if job state is not canceled.
// In particular, a multiple jobs within a job group may enters CANCELED state
// at different times when each reaches its own end offset, so we don't want to
// re-run those jobs.
if (job.getState() != JobState.JOB_STATE_CANCELED
&& jobStatus != null
&& (job.getJob().getKafkaConsumerTask().getEndOffset()
<= jobStatus.getJobStatus().getKafkaConsumerTaskStatus().getCommitOffset())) {
// 1. Cancel the job if it has reached the end offset and it is not yet canceled.
job = job.toBuilder().setState(JobState.JOB_STATE_CANCELED).build();
rebalancingJobGroup.updateJob(jobId, job);
} else if (job.getState() != JobState.JOB_STATE_CANCELED
&& job.getState() != jobGroupState) {
// 2. propagate job group state to job state only if job state is not CANCELED b/c
// job state CANCELED means it reached the end offset but is waiting for other partitions
// to reach end offset.
rebalancingJobGroup.updateJob(jobId, job.toBuilder().setState(jobGroupState).build());
}
// check job state after job state change.
allJobsCanceled &= job.getState() == JobState.JOB_STATE_CANCELED;
}
// If all jobs are canceled but the job group is not canceled, cancel the job group.
if (
// when creating new job group, it will be init without jobs; we should not cancel in that
// case.
rebalancingJobGroup.getJobs().size() > 0
&&
// only cancel job group if all jobs are canceled.
allJobsCanceled
&&
// don't recancel jobs that are already canceled.
jobGroupState != JobState.JOB_STATE_CANCELED) {
rebalancingJobGroup.updateJobGroupState(JobState.JOB_STATE_CANCELED);
}
}
}