in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [1011:1129]
void processFetchedData(
@Nullable ConsumerRecords<K, V> consumerRecords, Map<TopicPartition, Job> taskMap)
throws InterruptedException {
Preconditions.checkNotNull(pipelineStateManager, "pipeline config manager required");
Preconditions.checkNotNull(processorSink, "sink required");
if (taskMap.size() == 0 || consumerRecords == null || consumerRecords.isEmpty()) {
return;
}
Set<TopicPartition> partitions = consumerRecords.partitions();
for (TopicPartition tp : partitions) {
List<ConsumerRecord<K, V>> records = consumerRecords.records(tp);
Job job = taskMap.get(tp);
if (records.size() != 0) {
// make sure the job is not null
if (job == null) {
pausePartitionAndSeekOffset(tp, records.get(0).offset());
continue;
}
final int size = records.size();
long lastProcessedOffset = records.get(size - 1).offset();
for (int i = 0; i < size; ++i) {
ConsumerRecord<K, V> record = records.get(i);
// FIXME: this is a workaround for retry queue duplication problem.
// https://t3.uberinternal.com/browse/KAFEP-1522
// long term solution is to have a separate thread for periodic commit offset, however, it
// is a complicated change.
// add maybeCommitOffsets method in here to make offset commit more frequent so that fewer
// messages get reprocessed when worker shuffle happened
if (perRecordCommit
&& (lastCommitOffsetsCheckTimeMs == -1L
|| (System.currentTimeMillis() - lastCommitOffsetsCheckTimeMs
> ACTIVE_COMMIT_INTERVAL_IN_MS))) {
commitOffsets(taskMap);
}
if (handleEndOffsetAndDelay(record, job, checkpointManager, pipelineStateManager)) {
lastProcessedOffset = record.offset() - 1;
pausePartitionAndSeekOffset(tp, record.offset());
break;
}
if (delayProcessManager.shouldDelayProcess(record)) {
delayProcessManager.pausedPartitionsAndRecords(tp, records.subList(i, size));
lastProcessedOffset = record.offset() - 1;
break;
}
scope
.tagged(
StructuredTags.builder()
.setKafkaGroup(
pipelineStateManager
.getJobTemplate()
.getKafkaConsumerTask()
.getConsumerGroup())
.setKafkaTopic(tp.topic())
.setKafkaPartition(tp.partition())
.build())
.gauge(MetricNames.TOPIC_PARTITION_DELAY_TIME)
.update(System.currentTimeMillis() - record.timestamp());
inflightMessageTracker.addMessage(tp, record.serializedValueSize());
// process the message
final Scope scopeWithGroupTopicPartition =
scope.tagged(
StructuredTags.builder()
.setKafkaGroup(job.getKafkaConsumerTask().getConsumerGroup())
.setKafkaTopic(record.topic())
.setKafkaPartition(record.partition())
.build());
final Timer messageProcessTimer =
scopeWithGroupTopicPartition.timer(MetricNames.MESSAGE_PROCESS_LATENCY);
long startNanoTime = System.nanoTime();
TracedConsumerRecord<K, V> tracedRecord =
TracedConsumerRecord.of(
record, infra.tracer(), job.getKafkaConsumerTask().getConsumerGroup());
processorSink
.submit(ItemAndJob.of(tracedRecord, job))
.toCompletableFuture()
.whenComplete(
(aLong, throwable) -> {
messageProcessTimer.record(Duration.between(startNanoTime, System.nanoTime()));
if (throwable != null) {
// because the fetcher side does not have a retry mechanism, the processor
// should handle all errors and never return an error to the fetcher
LOGGER.error(
"failed to process a record",
StructuredLogging.jobId(job.getJobId()),
StructuredLogging.kafkaGroup(
job.getKafkaConsumerTask().getConsumerGroup()),
StructuredLogging.kafkaTopic(record.topic()),
StructuredLogging.kafkaPartition(record.partition()),
StructuredLogging.kafkaOffset(record.offset()),
throwable);
scopeWithGroupTopicPartition
.counter(MetricNames.MESSAGE_PROCESS_FAILURE)
.inc(1);
} else {
// this setOffsetToCommit will keep the largest offset to commit.
checkpointManager.setOffsetToCommit(job, aLong);
// update throughput
throughputTracker.record(job, 1, record.serializedValueSize());
}
inflightMessageTracker.removeMessage(tp, record.serializedValueSize());
tracedRecord.complete(aLong, throwable);
});
}
// set next fetch lastProcessedOffset after current records enqueue succeed. When fetch
// request failed,
// fetcher will reset to fetchOffset to avoid data loss.
checkpointManager.setFetchOffset(job, lastProcessedOffset + 1);
} else {
if (job == null) {
pausePartitionAndSeekOffset(tp, -1L);
}
}
}
}