in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [930:977]
public abstract SeekStartOffsetOption getSeekStartOffsetOption(
long specifiedOffset,
@Nullable Long earliestOffset,
@Nullable Long latestOffset,
AutoOffsetResetPolicy autoOffsetResetPolicy);
@Nullable
ConsumerRecords<K, V> pollMessages(Map<TopicPartition, Job> allTopicPartitionJobMap)
throws InterruptedException {
Preconditions.checkNotNull(pipelineStateManager, "pipeline config manager required");
@Nullable ConsumerRecords<K, V> records = null;
if (!allTopicPartitionJobMap.isEmpty()) {
allTopicPartitionJobMap.forEach(
(tp, job) -> {
LOGGER.debug(
"kafka.poll",
StructuredLogging.jobId(job.getJobId()),
StructuredLogging.kafkaCluster(job.getKafkaConsumerTask().getCluster()),
StructuredLogging.kafkaGroup(job.getKafkaConsumerTask().getConsumerGroup()),
StructuredLogging.kafkaPartition(job.getKafkaConsumerTask().getPartition()));
});
Stopwatch kafkaPollTimer =
scope
.tagged(
StructuredTags.builder()
.setKafkaGroup(
pipelineStateManager
.getJobTemplate()
.getKafkaConsumerTask()
.getConsumerGroup())
.build())
.timer(MetricNames.KAFKA_POLL_LATENCY)
.start();
records = kafkaConsumer.poll(java.time.Duration.ofMillis(pollTimeoutMs));
kafkaPollTimer.stop();
LOGGER.debug(
"kafka.poll.success", StructuredLogging.count(records == null ? 0 : records.count()));
} else {
LOGGER.debug("will not fetch messages", StructuredLogging.reason("there is no job assigned"));
try {
Thread.sleep(FETCHER_IDLE_SLEEP_MS);
} catch (InterruptedException e) {
LOGGER.info("Sleep was interrupted", e);
}
records = ConsumerRecords.empty();
}
return records;
}