void commitOffsets()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [414:453]


  void commitOffsets(Map<TopicPartition, Job> allTopicPartitionJobMap) {
    Preconditions.checkNotNull(pipelineStateManager, "pipeline config manager required");
    if (allTopicPartitionJobMap.isEmpty()) {
      return;
    }
    long currentTimestampMs = System.currentTimeMillis();
    // we don't need to make lastCommitOffsetsCheckTimeMs thread-safe, as this number does not need
    // to be
    // accurate
    if (currentTimestampMs - lastCommitOffsetsCheckTimeMs > config.getOffsetCommitIntervalMs()) {
      lastCommitOffsetsCheckTimeMs = currentTimestampMs;
      try {
        Map<TopicPartition, OffsetAndMetadata> tpCommitInfoMap = new HashMap<>();
        allTopicPartitionJobMap.forEach(
            (tp, job) -> {
              long offsetToCommit = checkpointManager.getOffsetToCommit(job);
              long committedOffset = checkpointManager.getCommittedOffset(job);
              if (eligibleToCommit(offsetToCommit, committedOffset)) {
                tpCommitInfoMap.put(tp, new OffsetAndMetadata(offsetToCommit));
              }
            });
        if (tpCommitInfoMap.isEmpty()) {
          return;
        }
        String consumerGroup =
            pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getConsumerGroup();
        String topic = pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getTopic();
        if (asyncCommitOffset) {
          commitAsync(tpCommitInfoMap, allTopicPartitionJobMap, consumerGroup, topic);
        } else {
          commitSync(tpCommitInfoMap, allTopicPartitionJobMap, consumerGroup, topic);
        }
        lastCommitTimestampMs = currentTimestampMs;
      } catch (Throwable throwable) {
        // we catch the error and continue the work
        LOGGER.error("failed to commit offsets to kafka servers", throwable);
        scope.counter(MetricNames.OFFSET_COMMIT_EXCEPTION).inc(1);
      }
    }
  }