in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/creator/BatchJobCreator.java [51:210]
public StoredJob newJob(StoredJobGroup storedJobGroup, long jobId, int partition) {
final String cluster = storedJobGroup.getJobGroup().getKafkaConsumerTaskGroup().getCluster();
final String topic = storedJobGroup.getJobGroup().getKafkaConsumerTaskGroup().getTopic();
final String consumerGroup =
storedJobGroup.getJobGroup().getKafkaConsumerTaskGroup().getConsumerGroup();
return Instrumentation.instrument.withRuntimeException(
logger,
infra.scope(),
infra.tracer(),
() -> {
TopicPartition topicPartition = new TopicPartition(topic, partition);
long startTimestamp =
Timestamps.toMillis(
storedJobGroup.getJobGroup().getKafkaConsumerTaskGroup().getStartTimestamp());
long endTimestamp =
Timestamps.toMillis(
storedJobGroup.getJobGroup().getKafkaConsumerTaskGroup().getEndTimestamp());
assertValidTimestamps(startTimestamp, endTimestamp);
AdminClient adminClient = adminBuilder.build(cluster);
Stopwatch beginningOffsetLatencyWatch =
scope.timer(MetricNames.BEGINNING_OFFSETS_LATENCY).start();
long lowWatermark =
offsetOf(
adminClient.beginningOffsets(ImmutableList.of(topicPartition)),
topicPartition,
consumerGroup,
0,
beginningOffsetLatencyWatch);
Stopwatch endOffsetLatencyWatch = scope.timer(MetricNames.END_OFFSETS_LATENCY).start();
long highWatermark =
offsetOf(
adminClient.endOffsets(ImmutableList.of(topicPartition)),
topicPartition,
consumerGroup,
0,
endOffsetLatencyWatch);
// there are no messages in this partition
if (lowWatermark == highWatermark) {
return newJob(
scope,
logger,
storedJobGroup,
JOB_TYPE,
jobId,
partition,
lowWatermark,
highWatermark);
}
// offsetForTimes returns null or -1 in the following cases:
// 1. query timestamp > highwatermark timestamp
// 2. topic partition has no data (either due to retention or lack of use).
//
// If there is at least one message in the topic partition, the following are true:
// 1. query timestamp < lowwatermark timestamp
// 2. query offset within the range returns the first message with timestamp > queried
// timestamp.
//
// NOTE: this only works for kafka-client >= 2.2.1. This does NOT work correctly for kafka
// 1.1.1 client due to some bugs.
//
// Test result when querying timestamp 1 (1 ms after epoch) and 1646259569000 (March 2022)
// in March 2020:
//
// lowwatermark = 96823354
// highwatermark = 96823388
//
// bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
// localhost:9092 --topic topic_1
// --partitions 0 --time 1
// topic_1:0:96823354
// bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
// localhost:9092 --topic topic_1
// --partitions 0 --time 1646259569000
// topic_1:0:
//
// lowwatermark = 0
// highwatermark = 0
//
// bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
// localhost:9092 --topic topic_2
// --partitions 0 --time 1
// topic_2:0:
// bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
// localhost:9092 --topic topic_3
// --partitions 0 --time 1646259569000
// topic_3:0:
//
// lowwatermark = 221
// highwatermark = 221
//
// bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
// localhost:9092 --topic
// topic_4 --partitions 0 --time 1
// topic_4:0:
// bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
// localhost:9092 --topic
// topic_4 --partitions 0 --time 1646259569000
// topic_4:0:
long endOffset =
getOffset(
() -> {
Stopwatch watch = scope.timer(MetricNames.OFFSET_FOR_TIMES_LATENCY).start();
return offsetOf(
adminClient.offsetsForTimes(ImmutableMap.of(topicPartition, endTimestamp)),
topicPartition,
consumerGroup,
-1,
watch);
},
() -> {
// offsetForTimes may return null result of querying timestamp >
// highwatermark timestamp, fallback endOffset in this case.
logger.warn(
"failed to get end offset, falling back to high watermark",
StructuredLogging.kafkaTopic(topic),
StructuredLogging.kafkaGroup(consumerGroup),
StructuredLogging.kafkaPartition(partition));
return highWatermark;
});
long startOffset =
getOffset(
() -> {
Stopwatch watch = scope.timer(MetricNames.OFFSET_FOR_TIMES_LATENCY).start();
return offsetOf(
adminClient.offsetsForTimes(
ImmutableMap.of(topicPartition, startTimestamp)),
topicPartition,
consumerGroup,
-1,
watch);
},
// offsetForTimes may return null result of querying timestamp >
// highwatermark timestamp, fallback endOffset in this case.
() -> {
logger.warn(
"failed to get start offset, falling back to end offset",
StructuredLogging.kafkaTopic(topic),
StructuredLogging.kafkaGroup(consumerGroup),
StructuredLogging.kafkaPartition(partition));
return endOffset;
});
assertValidOffsets(startOffset, endOffset);
return newJob(
scope, logger, storedJobGroup, JOB_TYPE, jobId, partition, startOffset, endOffset);
},
"creator.job.create",
StructuredFields.KAFKA_CLUSTER,
cluster,
StructuredFields.KAFKA_TOPIC,
topic,
StructuredFields.KAFKA_PARTITION,
Integer.toString(partition));
}