in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [605:692]
boolean extractTopicPartitionMap(
Map<TopicPartition, Job> newTopicPartitionJobMap,
Map<TopicPartition, Job> removedTopicPartitionJobMap,
Map<TopicPartition, Job> allTopicPartitionJobMap) {
Preconditions.checkNotNull(pipelineStateManager, "pipeline config manager required");
// Use a snapshot of the expectedRunningJobMap as it might change.
Map<Long, Job> expectedRunningJobMap = pipelineStateManager.getExpectedRunningJobMap();
// step 1: create an old topic partition set and an old job set from previousJobRunningMap.
Map<Long, Job> oldJobs = ImmutableMap.copyOf(currentRunningJobMap);
Set<TopicPartition> oldTPs = new HashSet<>();
currentRunningJobMap.forEach(
(jobId, job) -> {
TopicPartition topicPartition =
new TopicPartition(
job.getKafkaConsumerTask().getTopic(), job.getKafkaConsumerTask().getPartition());
oldTPs.add(topicPartition);
});
currentRunningJobMap.clear();
currentRunningJobMap.putAll(expectedRunningJobMap);
// step 2: create allTopicPartitionJobMap from current running jobSet
expectedRunningJobMap.forEach(
(jobId, job) -> {
// TODO (T4367183): currently, if more than one jobs have the same TopicPartition,
// randomly pick one. Need to think about how to merge jobs correctly carefully
TopicPartition topicPartition =
new TopicPartition(
job.getKafkaConsumerTask().getTopic(), job.getKafkaConsumerTask().getPartition());
if (allTopicPartitionJobMap.containsKey(topicPartition)) {
LOGGER.error(
"two jobs have the same topic partition",
StructuredLogging.kafkaCluster(job.getKafkaConsumerTask().getCluster()),
StructuredLogging.kafkaGroup(job.getKafkaConsumerTask().getConsumerGroup()),
StructuredLogging.kafkaTopic(job.getKafkaConsumerTask().getTopic()),
StructuredLogging.kafkaPartition(job.getKafkaConsumerTask().getPartition()),
StructuredLogging.reason(
"two job ids: "
+ job.getJobId()
+ ", "
+ allTopicPartitionJobMap.get(topicPartition).getJobId()));
scope
.tagged(
StructuredTags.builder()
.setKafkaGroup(job.getKafkaConsumerTask().getConsumerGroup())
.setKafkaTopic(job.getKafkaConsumerTask().getTopic())
.setKafkaPartition(job.getKafkaConsumerTask().getPartition())
.build())
.gauge(MetricNames.TOPIC_PARTITION_DUP_JOB)
.update(1.0);
} else {
allTopicPartitionJobMap.put(topicPartition, job);
}
});
// step 3: check for new topic partitions
for (Map.Entry<TopicPartition, Job> entry : allTopicPartitionJobMap.entrySet()) {
// do not need to re-seek offset for existing topic-partitions
if (!oldTPs.contains(entry.getKey())) {
newTopicPartitionJobMap.put(entry.getKey(), entry.getValue());
}
}
List<Job> runJobs = new ArrayList<>();
for (Map.Entry<Long, Job> jobEntry : expectedRunningJobMap.entrySet()) {
if (!oldJobs.containsKey(jobEntry.getKey())) {
runJobs.add(jobEntry.getValue());
}
}
// step 4: add cancel jobs to removedTopicPartitionJobMap
List<Job> cancelJobs = new ArrayList<>();
for (Map.Entry<Long, Job> jobEntry : oldJobs.entrySet()) {
if (!expectedRunningJobMap.containsKey(jobEntry.getKey())) {
Job cancelJob = jobEntry.getValue();
cancelJobs.add(cancelJob);
removedTopicPartitionJobMap.put(
new TopicPartition(
cancelJob.getKafkaConsumerTask().getTopic(),
cancelJob.getKafkaConsumerTask().getPartition()),
cancelJob);
}
}
logCommands(runJobs, cancelJobs);
// step 5: Check for job assignment changes.
return !runJobs.isEmpty() || !cancelJobs.isEmpty();
}