private OffsetRange getStartEndOffsetsFromTimestamp()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/creator/BatchJobCreator.java [152:260]


  private OffsetRange getStartEndOffsetsFromTimestamp(
      KafkaConsumerTaskGroup consumerTaskGroup,
      int partition,
      long highWatermark,
      AdminClient adminClient) {
    String topic = consumerTaskGroup.getTopic();
    String consumerGroup = consumerTaskGroup.getConsumerGroup();
    TopicPartition topicPartition = new TopicPartition(topic, partition);
    long startTimestamp = Timestamps.toMillis(consumerTaskGroup.getStartTimestamp());
    long endTimestamp = Timestamps.toMillis(consumerTaskGroup.getEndTimestamp());
    assertValidTimestamps(startTimestamp, endTimestamp);

    // offsetForTimes returns null or -1 in the following cases:
    // 1. query timestamp > highwatermark timestamp
    // 2. topic partition has no data (either due to retention or lack of use).
    //
    // If there is at least one message in the topic partition, the following are true:
    // 1. query timestamp < lowwatermark timestamp
    // 2. query offset within the range returns the first message with timestamp > queried
    // timestamp.
    //
    // NOTE: this only works for kafka-client >= 2.2.1. This does NOT work correctly for kafka
    // 1.1.1 client due to some bugs.
    //
    // Test result when querying timestamp 1 (1 ms after epoch) and 1646259569000 (March 2022)
    // in March 2020:
    //
    // lowwatermark = 96823354
    // highwatermark = 96823388
    //
    // bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
    // localhost:9092 --topic topic_1
    // --partitions 0 --time 1
    // topic_1:0:96823354
    // bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
    // localhost:9092 --topic topic_1
    // --partitions 0 --time 1646259569000
    // topic_1:0:
    //
    // lowwatermark = 0
    // highwatermark = 0
    //
    // bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
    // localhost:9092 --topic topic_2
    // --partitions 0 --time 1
    // topic_2:0:
    // bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
    // localhost:9092 --topic topic_3
    // --partitions 0 --time 1646259569000
    // topic_3:0:
    //
    // lowwatermark =  221
    // highwatermark = 221
    //
    // bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
    // localhost:9092 --topic
    // topic_4 --partitions 0 --time 1
    // topic_4:0:
    // bash% ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
    // localhost:9092 --topic
    // topic_4 --partitions 0 --time 1646259569000
    // topic_4:0:

    long endOffset =
        getOffset(
            () -> {
              Stopwatch watch = scope.timer(MetricNames.OFFSET_FOR_TIMES_LATENCY).start();
              return offsetOf(
                  adminClient.offsetsForTimes(ImmutableMap.of(topicPartition, endTimestamp)),
                  topicPartition,
                  consumerGroup,
                  -1,
                  watch);
            },
            () -> {
              // offsetForTimes may return null result of querying timestamp >
              // highwatermark timestamp, fallback endOffset in this case.
              logger.warn(
                  "failed to get end offset, falling back to high watermark",
                  StructuredLogging.kafkaTopic(topic),
                  StructuredLogging.kafkaGroup(consumerGroup),
                  StructuredLogging.kafkaPartition(partition));
              return highWatermark;
            });
    long startOffset =
        getOffset(
            () -> {
              Stopwatch watch = scope.timer(MetricNames.OFFSET_FOR_TIMES_LATENCY).start();
              return offsetOf(
                  adminClient.offsetsForTimes(ImmutableMap.of(topicPartition, startTimestamp)),
                  topicPartition,
                  consumerGroup,
                  -1,
                  watch);
            },
            // offsetForTimes may return null result of querying timestamp >
            // highwatermark timestamp, fallback endOffset in this case.
            () -> {
              logger.warn(
                  "failed to get start offset, falling back to end offset",
                  StructuredLogging.kafkaTopic(topic),
                  StructuredLogging.kafkaGroup(consumerGroup),
                  StructuredLogging.kafkaPartition(partition));
              return endOffset;
            });

    assertValidOffsets(startOffset, endOffset);
    return new OffsetRange(startOffset, endOffset);
  }