public StoredJob newJob()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/creator/BatchJobCreator.java [51:210]


  public StoredJob newJob(StoredJobGroup storedJobGroup, long jobId, int partition) {
    final String cluster = storedJobGroup.getJobGroup().getKafkaConsumerTaskGroup().getCluster();
    final String topic = storedJobGroup.getJobGroup().getKafkaConsumerTaskGroup().getTopic();
    final String consumerGroup =
        storedJobGroup.getJobGroup().getKafkaConsumerTaskGroup().getConsumerGroup();
    return Instrumentation.instrument.withRuntimeException(
        logger,
        infra.scope(),
        infra.tracer(),
        () -> {
          TopicPartition topicPartition = new TopicPartition(topic, partition);
          long startTimestamp =
              Timestamps.toMillis(
                  storedJobGroup.getJobGroup().getKafkaConsumerTaskGroup().getStartTimestamp());
          long endTimestamp =
              Timestamps.toMillis(
                  storedJobGroup.getJobGroup().getKafkaConsumerTaskGroup().getEndTimestamp());

          assertValidTimestamps(startTimestamp, endTimestamp);
          AdminClient adminClient = adminBuilder.build(cluster);

          Stopwatch beginningOffsetLatencyWatch =
              scope.timer(MetricNames.BEGINNING_OFFSETS_LATENCY).start();
          long lowWatermark =
              offsetOf(
                  adminClient.beginningOffsets(ImmutableList.of(topicPartition)),
                  topicPartition,
                  consumerGroup,
                  0,
                  beginningOffsetLatencyWatch);
          Stopwatch endOffsetLatencyWatch = scope.timer(MetricNames.END_OFFSETS_LATENCY).start();
          long highWatermark =
              offsetOf(
                  adminClient.endOffsets(ImmutableList.of(topicPartition)),
                  topicPartition,
                  consumerGroup,
                  0,
                  endOffsetLatencyWatch);

          // there are no messages in this partition
          if (lowWatermark == highWatermark) {
            return newJob(
                scope,
                logger,
                storedJobGroup,
                JOB_TYPE,
                jobId,
                partition,
                lowWatermark,
                highWatermark);
          }

          // offsetForTimes returns null or -1 in the following cases:
          // 1. query timestamp > highwatermark timestamp
          // 2. topic partition has no data (either due to retention or lack of use).
          //
          // If there is at least one message in the topic partition, the following are true:
          // 1. query timestamp < lowwatermark timestamp
          // 2. query offset within the range returns the first message with timestamp > queried
          // timestamp.
          //
          // NOTE: this only works for kafka-client >= 2.2.1. This does NOT work correctly for kafka
          // 1.1.1 client due to some bugs.
          //
          // Test result when querying timestamp 1 (1 ms after epoch) and 1646259569000 (March 2022)
          // in March 2020:
          //
          // lowwatermark = 96823354
          // highwatermark = 96823388
          //
          // bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
          // localhost:9092 --topic topic_1
          // --partitions 0 --time 1
          // topic_1:0:96823354
          // bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
          // localhost:9092 --topic topic_1
          // --partitions 0 --time 1646259569000
          // topic_1:0:
          //
          // lowwatermark = 0
          // highwatermark = 0
          //
          // bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
          // localhost:9092 --topic topic_2
          // --partitions 0 --time 1
          // topic_2:0:
          // bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
          // localhost:9092 --topic topic_3
          // --partitions 0 --time 1646259569000
          // topic_3:0:
          //
          // lowwatermark =  221
          // highwatermark = 221
          //
          // bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
          // localhost:9092 --topic
          // topic_4 --partitions 0 --time 1
          // topic_4:0:
          // bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
          // localhost:9092 --topic
          // topic_4 --partitions 0 --time 1646259569000
          // topic_4:0:

          long endOffset =
              getOffset(
                  () -> {
                    Stopwatch watch = scope.timer(MetricNames.OFFSET_FOR_TIMES_LATENCY).start();
                    return offsetOf(
                        adminClient.offsetsForTimes(ImmutableMap.of(topicPartition, endTimestamp)),
                        topicPartition,
                        consumerGroup,
                        -1,
                        watch);
                  },
                  () -> {
                    // offsetForTimes may return null result of querying timestamp >
                    // highwatermark timestamp, fallback endOffset in this case.
                    logger.warn(
                        "failed to get end offset, falling back to high watermark",
                        StructuredLogging.kafkaTopic(topic),
                        StructuredLogging.kafkaGroup(consumerGroup),
                        StructuredLogging.kafkaPartition(partition));
                    return highWatermark;
                  });
          long startOffset =
              getOffset(
                  () -> {
                    Stopwatch watch = scope.timer(MetricNames.OFFSET_FOR_TIMES_LATENCY).start();
                    return offsetOf(
                        adminClient.offsetsForTimes(
                            ImmutableMap.of(topicPartition, startTimestamp)),
                        topicPartition,
                        consumerGroup,
                        -1,
                        watch);
                  },
                  // offsetForTimes may return null result of querying timestamp >
                  // highwatermark timestamp, fallback endOffset in this case.
                  () -> {
                    logger.warn(
                        "failed to get start offset, falling back to end offset",
                        StructuredLogging.kafkaTopic(topic),
                        StructuredLogging.kafkaGroup(consumerGroup),
                        StructuredLogging.kafkaPartition(partition));
                    return endOffset;
                  });

          assertValidOffsets(startOffset, endOffset);

          return newJob(
              scope, logger, storedJobGroup, JOB_TYPE, jobId, partition, startOffset, endOffset);
        },
        "creator.job.create",
        StructuredFields.KAFKA_CLUSTER,
        cluster,
        StructuredFields.KAFKA_TOPIC,
        topic,
        StructuredFields.KAFKA_PARTITION,
        Integer.toString(partition));
  }