in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [236:369]
public void doWork() {
Preconditions.checkNotNull(pipelineStateManager, "pipeline config manager required");
Preconditions.checkNotNull(processorSink, "processor sink required");
if (!processorSink.isRunning()) {
initiateShutdown();
cleanup();
return;
}
scope
.tagged(
StructuredTags.builder()
.setKafkaCluster(
pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getCluster())
.setKafkaGroup(
pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getConsumerGroup())
.setKafkaTopic(
pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getTopic())
.build())
.gauge(MetricNames.FETCHER_KAFKA_WORKING)
.update(1.0);
// step 1: discover topic partition changes
Map<TopicPartition, Job> addedTopicPartitionJobMap = new HashMap<>();
Map<TopicPartition, Job> removedTopicPartitionJobMap = new HashMap<>();
Map<TopicPartition, Job> allTopicPartitionJobMap = new HashMap<>();
// the extractTopicPartitionMap method will modify three passed-in maps, therefore, do not make
// those maps immutable.
boolean assignmentChange =
extractTopicPartitionMap(
addedTopicPartitionJobMap, removedTopicPartitionJobMap, allTopicPartitionJobMap);
// step 2: handle newly-added topic partitions
// specifically, (1) the checkpoint manager starts to track them (2) extracts topic partitions
// with non-negative start offset for further processing.
// (3) fetch committed offset from broker and put it into checkpoint manager
Map<TopicPartition, OffsetAndMetadata> brokerCommittedOffset = new HashMap<>();
if (!addedTopicPartitionJobMap.isEmpty()) {
brokerCommittedOffset = getBrokerCommittedOffset(addedTopicPartitionJobMap.keySet());
}
Map<TopicPartition, Long> topicPartitionOffsetMap =
addToCheckPointManager(addedTopicPartitionJobMap, brokerCommittedOffset);
// step 2.1 track throughput and inflightMessage of newly added topic partitions and the removed
// topic partitions
adjustTracker(addedTopicPartitionJobMap, removedTopicPartitionJobMap);
// use kafkaConsumerLock to ensure threadsafe access for kafkaConsumer and the in-order enqueue
// for ConsumerRecords
try {
// step 3: performs topic partition addition and deletion
// step 3.1: update assignment
if (assignmentChange) {
LOGGER.info(
"assignment changed",
StructuredLogging.kafkaTopicPartitions(allTopicPartitionJobMap.keySet()));
kafkaConsumer.assign(allTopicPartitionJobMap.keySet());
// delete removed jobs from delayProcessManager
delayProcessManager.delete(removedTopicPartitionJobMap.keySet());
// Resume all topic partitions in case they are paused before.
// The partitions paused by delayProcessManager will not be resumed here, but will be
// resumed in DelayProcessManager.
// If we don't resume and if the previously paused topic partitions are assigned to
// this worker again, those topic partitions will still be in paused state. This might be
// a problem because we cannot fetch messages from this specific topic partitions.
// topic partitions paused by delayProcessManager will be resumed in resumeTopicPartitions
// function.
List<TopicPartition> pausedTopicPartitions = delayProcessManager.getAll();
Collection<TopicPartition> topicPartitions =
allTopicPartitionJobMap
.keySet()
.stream()
.filter(tp -> !pausedTopicPartitions.contains(tp))
.collect(Collectors.toSet());
kafkaConsumer.resume(topicPartitions);
}
// step 3.2: seek start offsets when it's necessary
seekStartOffset(topicPartitionOffsetMap, allTopicPartitionJobMap);
// step 4: commits offsets
commitOffsets(allTopicPartitionJobMap);
// step 5: poll messages
@Nullable ConsumerRecords<K, V> records = pollMessages(allTopicPartitionJobMap);
// step 6: get resumed topic partitions
Map<TopicPartition, List<ConsumerRecord<K, V>>> resumedRecords =
delayProcessManager.resumePausedPartitionsAndRecords();
// step 7: merge resumed records with new polled records
@Nullable ConsumerRecords<K, V> mergedRecords = mergeRecords(records, resumedRecords);
// step 8: update actual running job status here because we use async commit, so offsets are
// committed to Kafka servers when we are doing polling.
updateActualRunningJobStatus();
// step 9: process messages
processFetchedData(mergedRecords, allTopicPartitionJobMap);
// step 10: log and report metric
logTopicPartitionOffsetInfo(allTopicPartitionJobMap);
} catch (Throwable e) {
// TODO (T4575853): recreate kafka consumer if kafka consumer is closed.
scope
.tagged(
StructuredTags.builder()
.setKafkaCluster(
pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getCluster())
.setKafkaGroup(
pipelineStateManager
.getJobTemplate()
.getKafkaConsumerTask()
.getConsumerGroup())
.setKafkaTopic(
pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getTopic())
.setError(e)
.build())
.gauge(MetricNames.FETCHER_KAFKA_NOT_WORKING)
.update(1.0);
// don't log authorization exception because it will be in large volume
if (!(e instanceof TopicAuthorizationException)) {
LOGGER.error("failed to doWork", e);
}
// reset offset to fetchOffset to avoid data loss
for (Map.Entry<TopicPartition, Job> entry : allTopicPartitionJobMap.entrySet()) {
if (checkpointManager.getCheckpointInfo(entry.getValue()).getFetchOffset() >= 0) {
kafkaConsumer.seek(
entry.getKey(),
checkpointManager.getCheckpointInfo(entry.getValue()).getFetchOffset());
}
}
}
}