public void seekStartOffset()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [793:898]


  public void seekStartOffset(
      Map<TopicPartition, Long> seekOffsetTaskMap, Map<TopicPartition, Job> topicPartitionJobMap) {
    Preconditions.checkNotNull(pipelineStateManager, "pipeline config manager required");
    if (seekOffsetTaskMap.isEmpty()) {
      return;
    }
    LOGGER.info(
        "seek start offsets",
        StructuredLogging.kafkaTopicPartitions(topicPartitionJobMap.keySet()));
    Map<TopicPartition, Long> beginningOffsets =
        kafkaConsumer.beginningOffsets(seekOffsetTaskMap.keySet());
    Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(seekOffsetTaskMap.keySet());
    for (Map.Entry<TopicPartition, Long> entry : seekOffsetTaskMap.entrySet()) {
      TopicPartition tp = entry.getKey();
      long offset = entry.getValue();
      if (topicPartitionJobMap.get(tp) == null) {
        continue;
      }
      if (offset >= 0) {
        Long earliestOffset = beginningOffsets.get(tp);
        Long latestOffset = endOffsets.get(tp);
        String consumerGroup =
            pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getConsumerGroup();
        // log the offset info
        if (earliestOffset == null) {
          logStartOffsetInfo(
              consumerGroup,
              tp.topic(),
              tp.partition(),
              offset,
              MetricNames.TOPIC_PARTITION_SEEK_START_NULL);
        } else if (earliestOffset > offset) {
          logStartOffsetInfo(
              consumerGroup,
              tp.topic(),
              tp.partition(),
              offset,
              MetricNames.TOPIC_PARTITION_SEEK_TOO_SMALL);
        }
        if (latestOffset == null) {
          logStartOffsetInfo(
              consumerGroup,
              tp.topic(),
              tp.partition(),
              offset,
              MetricNames.TOPIC_PARTITION_SEEK_END_NULL);
        } else if (latestOffset < offset) {
          logStartOffsetInfo(
              consumerGroup,
              tp.topic(),
              tp.partition(),
              offset,
              MetricNames.TOPIC_PARTITION_SEEK_TOO_LARGE);
        }
        switch (getSeekStartOffsetOption(
            offset,
            earliestOffset,
            latestOffset,
            topicPartitionJobMap.get(tp).getKafkaConsumerTask().getAutoOffsetResetPolicy())) {
          case SEEK_TO_SPECIFIED_OFFSET:
            kafkaConsumer.seek(tp, offset);
            checkpointManager.setFetchOffset(topicPartitionJobMap.get(tp), offset);
            break;
          case SEEK_TO_EARLIEST_OFFSET:
            kafkaConsumer.seekToBeginning(Collections.singleton(tp));
            if (earliestOffset != null) {
              // it should be okay if we don't change the fetch offset, as it will eventually be
              // override when messages are being processed.
              checkpointManager.setFetchOffset(topicPartitionJobMap.get(tp), earliestOffset);
            }
            break;
          case SEEK_TO_LATEST_OFFSET:
            kafkaConsumer.seekToEnd(Collections.singleton(tp));
            if (latestOffset != null) {
              // it should be okay if we don't change the fetch offset, as it will eventually be
              // override when messages are being processed.
              checkpointManager.setFetchOffset(topicPartitionJobMap.get(tp), latestOffset);
            }
            break;
          case DO_NOT_SEEK:
            // do not seek means going to the committed offset.
            // although we don't need to seek, we still need to set the offsets.
            OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
            if (committedOffset != null) {
              checkpointManager.setCommittedOffset(
                  topicPartitionJobMap.get(tp), committedOffset.offset());
              checkpointManager.setFetchOffset(
                  topicPartitionJobMap.get(tp), committedOffset.offset());
            } else {
              handleNoCommittedOffsetWhenDoNotSeekPolicy(
                  kafkaConsumer, checkpointManager, topicPartitionJobMap.get(tp), tp);
            }
            break;
          default:
            // do nothing
        }
        LOGGER.debug(
            "seek kafka consumer",
            StructuredLogging.kafkaGroup(
                pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getConsumerGroup()),
            StructuredLogging.kafkaTopic(tp.topic()),
            StructuredLogging.kafkaPartition(tp.partition()),
            StructuredLogging.kafkaOffset(offset));
      }
    }
  }