public abstract SeekStartOffsetOption getSeekStartOffsetOption()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [930:977]


  public abstract SeekStartOffsetOption getSeekStartOffsetOption(
      long specifiedOffset,
      @Nullable Long earliestOffset,
      @Nullable Long latestOffset,
      AutoOffsetResetPolicy autoOffsetResetPolicy);

  @Nullable
  ConsumerRecords<K, V> pollMessages(Map<TopicPartition, Job> allTopicPartitionJobMap)
      throws InterruptedException {
    Preconditions.checkNotNull(pipelineStateManager, "pipeline config manager required");
    @Nullable ConsumerRecords<K, V> records = null;
    if (!allTopicPartitionJobMap.isEmpty()) {
      allTopicPartitionJobMap.forEach(
          (tp, job) -> {
            LOGGER.debug(
                "kafka.poll",
                StructuredLogging.jobId(job.getJobId()),
                StructuredLogging.kafkaCluster(job.getKafkaConsumerTask().getCluster()),
                StructuredLogging.kafkaGroup(job.getKafkaConsumerTask().getConsumerGroup()),
                StructuredLogging.kafkaPartition(job.getKafkaConsumerTask().getPartition()));
          });
      Stopwatch kafkaPollTimer =
          scope
              .tagged(
                  StructuredTags.builder()
                      .setKafkaGroup(
                          pipelineStateManager
                              .getJobTemplate()
                              .getKafkaConsumerTask()
                              .getConsumerGroup())
                      .build())
              .timer(MetricNames.KAFKA_POLL_LATENCY)
              .start();
      records = kafkaConsumer.poll(java.time.Duration.ofMillis(pollTimeoutMs));
      kafkaPollTimer.stop();
      LOGGER.debug(
          "kafka.poll.success", StructuredLogging.count(records == null ? 0 : records.count()));
    } else {
      LOGGER.debug("will not fetch messages", StructuredLogging.reason("there is no job assigned"));
      try {
        Thread.sleep(FETCHER_IDLE_SLEEP_MS);
      } catch (InterruptedException e) {
        LOGGER.info("Sleep was interrupted", e);
      }
      records = ConsumerRecords.empty();
    }
    return records;
  }