in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/common/KafkaPartitionExpansionWatcher.java [166:247]
private void expandOrShrink(
Versioned<StoredJobGroup> versionedJobGroup, List<TopicPartitionInfo> partitionInfos) {
Instrumentation.instrument.returnVoidCatchThrowable(
logger,
infra.scope(),
infra.tracer(),
() -> {
boolean isChanged = false;
StoredJobGroup jobGroup = versionedJobGroup.model();
// Verify that job_key is [0, expectedJobCount)
// Removing any jobs that are not valid
Map<Integer, StoredJob> currentJobs =
jobGroup
.getJobsList()
.stream()
.collect(Collectors.toMap(JobUtils::getJobKey, v -> v));
Map<Integer, StoredJob> newJobs = new HashMap<>();
for (int i = 0; i < partitionInfos.size(); i++) {
String jobPod = getJobPod(partitionInfos.get(i));
if (currentJobs.containsKey(i)) {
StoredJob oldJob = currentJobs.get(i);
Preconditions.checkNotNull(
oldJob, "oldJob should not be null since we just checked map contains it");
if (!jobPod.equals(oldJob.getJobPod())) {
// update StoredJob if jobPod is changed, this will be persisted into job store
StoredJob newJob = StoredJob.newBuilder(oldJob).setJobPod(jobPod).build();
newJobs.put(i, newJob);
isChanged = true;
} else {
newJobs.put(i, oldJob);
}
// remove old job from currentJobs map so that we can later check
// currentJobs map for any jobs that remain. These jobs are removed so we should be
// aware
// of this.
currentJobs.remove(i);
continue;
}
// Else, no jobs exist for this partition so we must add it.
// initialize the job state to match the job group state
StoredJob newJob =
jobCreator.newJob(
jobGroup,
jobIdProvider.getId(
StoredJob.newBuilder()
.setJob(JobUtils.newJob(jobGroup.getJobGroup()))
.setJobPod(jobPod)
.build()),
i);
newJobs.put(i, newJob);
isChanged = true;
}
// if any jobs remain in current jobs, these are extra partitions so we must remove it.
if (currentJobs.size() > 0) {
isChanged = true;
}
// skip ZK write if no change
if (!isChanged) {
return;
}
StoredJobGroup.Builder newJobGroupBuilder = StoredJobGroup.newBuilder(jobGroup);
newJobGroupBuilder.clearJobs();
newJobGroupBuilder.addAllJobs(newJobs.values());
StoredJobGroup newJobGroup = newJobGroupBuilder.build();
jobGroupStore.put(
newJobGroup.getJobGroup().getJobGroupId(),
VersionedProto.from(newJobGroup, versionedJobGroup.version()));
},
"kafka-partition-expansion-watcher.expand-or-shrink",
StructuredFields.KAFKA_CLUSTER,
versionedJobGroup.model().getJobGroup().getKafkaConsumerTaskGroup().getCluster(),
StructuredFields.KAFKA_TOPIC,
versionedJobGroup.model().getJobGroup().getKafkaConsumerTaskGroup().getTopic(),
StructuredFields.KAFKA_GROUP,
versionedJobGroup.model().getJobGroup().getKafkaConsumerTaskGroup().getConsumerGroup());
}