in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/creator/BatchJobCreator.java [54:115]
public StoredJob newJob(StoredJobGroup storedJobGroup, long jobId, int partition) {
final KafkaConsumerTaskGroup consumerTaskGroup =
storedJobGroup.getJobGroup().getKafkaConsumerTaskGroup();
final String cluster = consumerTaskGroup.getCluster();
final String topic = consumerTaskGroup.getTopic();
final String consumerGroup = consumerTaskGroup.getConsumerGroup();
final TopicPartition topicPartition = new TopicPartition(topic, partition);
return Instrumentation.instrument.withRuntimeException(
logger,
infra.scope(),
infra.tracer(),
() -> {
AdminClient adminClient = adminBuilder.build(cluster);
OffsetRange partitionWatermarks =
getPartitionLowAndHighWatermarks(topicPartition, consumerGroup, adminClient);
// there are no messages in this partition
if (partitionWatermarks.start() == partitionWatermarks.end()) {
return newJob(
scope, logger, storedJobGroup, JOB_TYPE, jobId, partition, partitionWatermarks);
}
// Use partition offsets if available
if (!consumerTaskGroup
.getPartitionOffsetRanges()
.getPartitionOffsetRangeList()
.isEmpty()) {
logger.info(
"Using partition offsets for job creation",
StructuredLogging.kafkaTopic(topic),
StructuredLogging.kafkaGroup(consumerGroup),
StructuredLogging.kafkaPartition(partition));
return newJob(
scope,
logger,
storedJobGroup,
JOB_TYPE,
jobId,
partition,
getStartEndOffsetsFromPartitionOffsets(
consumerTaskGroup.getPartitionOffsetRanges(), partition));
}
// Use the timestamps
return newJob(
scope,
logger,
storedJobGroup,
JOB_TYPE,
jobId,
partition,
getStartEndOffsetsFromTimestamp(
consumerTaskGroup, partition, partitionWatermarks.end(), adminClient));
},
"creator.job.create",
StructuredFields.KAFKA_CLUSTER,
cluster,
StructuredFields.KAFKA_TOPIC,
topic,
StructuredFields.KAFKA_PARTITION,
Integer.toString(partition));
}