in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/KafkaDelayProcessManager.java [64:112]
public void pausedPartitionsAndRecords(
TopicPartition tp, List<ConsumerRecord<K, V>> unprocessedRecords) {
if (unprocessedRecords.size() == 0) {
LOGGER.error(
"The unprocessedRecords should not be empty",
StructuredLogging.kafkaGroup(consumerGroup),
StructuredLogging.kafkaTopic(tp.topic()),
StructuredLogging.kafkaPartition(tp.partition()));
return;
}
if (delayedRecords.containsKey(tp) && !delayedRecords.get(tp).isEmpty()) {
LOGGER.error(
String.format(
"The topic partition is already in delayedRecords with %s records",
delayedRecords.get(tp).size()),
StructuredLogging.kafkaGroup(consumerGroup),
StructuredLogging.kafkaTopic(tp.topic()),
StructuredLogging.kafkaPartition(tp.partition()));
scope
.tagged(
StructuredTags.builder()
.setKafkaGroup(consumerGroup)
.setKafkaTopic(tp.topic())
.setKafkaPartition(tp.partition())
.build())
.counter(MetricNames.TOPIC_PARTITION_REPEAT)
.inc(1);
Preconditions.checkState(false, "The topic partition %s is already in delayedRecords", tp);
}
kafkaConsumer.pause(Collections.singleton(tp));
delayedRecords.put(tp, unprocessedRecords);
LOGGER.info(
"kafka.pause",
StructuredLogging.kafkaGroup(consumerGroup),
StructuredLogging.kafkaTopic(tp.topic()),
StructuredLogging.kafkaPartition(tp.partition()),
StructuredLogging.reason(REASON));
scope
.tagged(
StructuredTags.builder()
.setKafkaGroup(consumerGroup)
.setKafkaTopic(tp.topic())
.setKafkaPartition(tp.partition())
.build())
.gauge(MetricNames.TOPIC_PARTITION_PAUSED)
.update(1);
}