in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/fetcher/RetryTopicKafkaFetcher.java [125:190]
public boolean handleEndOffsetAndDelay(
ConsumerRecord<byte[], byte[]> consumerRecord,
Job job,
CheckpointManager checkpointManager,
PipelineStateManager pipelineStateManager)
throws InterruptedException {
// a boolean indicating whether the caller needs to process the remaining messages for the
// given job or not. True means the caller does not need to, false means otherwise.
boolean stopProcessingRemaining = false;
// If the job is no longer assigned to this pipeline, there
// is no need to process the remaining records.
if (!pipelineStateManager.shouldJobBeRunning(job)) {
stopProcessingRemaining = true;
} else {
// handle the processing delay
long processingDelayMs = getProcessingDelayMs(job);
if (processingDelayMs > 0) {
long deadline = consumerRecord.timestamp() + processingDelayMs;
long waitingTimeMs = deadline - System.currentTimeMillis();
KafkaConsumerTask kafkaConsumerTask = job.getKafkaConsumerTask();
// wait until the expected delay is reached or the job was removed.
while (waitingTimeMs > 0 && pipelineStateManager.shouldJobBeRunning(job)) {
// log exceptionally long (>6H) waiting
if (waitingTimeMs > 21600000) {
LOGGER.info(
"long-waiting-messages",
StructuredLogging.kafkaCluster(kafkaConsumerTask.getCluster()),
StructuredLogging.kafkaGroup(kafkaConsumerTask.getConsumerGroup()),
StructuredLogging.kafkaTopic(kafkaConsumerTask.getTopic()),
StructuredLogging.kafkaPartition(kafkaConsumerTask.getPartition()),
StructuredLogging.kafkaOffset(consumerRecord.offset()));
}
final long finalWaitingTimeMs = waitingTimeMs;
Instrumentation.instrument.withException(
LOGGER,
infra.scope(),
infra.tracer(),
() -> {
lock.lock();
try {
return processingDelayReached.await(finalWaitingTimeMs, TimeUnit.MILLISECONDS);
} finally {
lock.unlock();
}
},
r -> !r,
"message-processing-wait",
StructuredTags.KAFKA_CLUSTER,
kafkaConsumerTask.getCluster(),
StructuredTags.KAFKA_GROUP,
kafkaConsumerTask.getConsumerGroup(),
StructuredTags.KAFKA_TOPIC,
kafkaConsumerTask.getTopic(),
StructuredTags.KAFKA_PARTITION,
Integer.toString(kafkaConsumerTask.getPartition()));
// update the waitingTimeMs in case await is terminated by signal.
waitingTimeMs = deadline - System.currentTimeMillis();
}
// check again: if the job is no longer assigned to this pipeline, there
// is no need to process the remaining records.
stopProcessingRemaining = !pipelineStateManager.shouldJobBeRunning(job);
}
}
return stopProcessingRemaining;
}