private void onCommitCompletion()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [551:602]


  private void onCommitCompletion(
      Map<TopicPartition, OffsetAndMetadata> offsets,
      Optional<Exception> exception,
      Map<TopicPartition, Job> allTopicPartitionJobMap,
      String consumerGroup) {
    if (!exception.isPresent()) {
      LOGGER.debug("committed offsets", StructuredLogging.count(offsets.size()));
      offsets.forEach(
          (tp, offsetMeta) -> {
            Job job = allTopicPartitionJobMap.get(tp);
            if (job == null) {
              return;
            }
            checkpointManager.setCommittedOffset(job, offsetMeta.offset());
            LOGGER.debug(
                "successfully committed offset",
                StructuredLogging.kafkaGroup(consumerGroup),
                StructuredLogging.kafkaTopic(tp.topic()),
                StructuredLogging.kafkaPartition(tp.partition()),
                StructuredLogging.kafkaOffset(offsetMeta.offset()));

            final Scope scopeWithGroupTopicPartition =
                scope.tagged(
                    StructuredTags.builder()
                        .setKafkaGroup(consumerGroup)
                        .setKafkaTopic(tp.topic())
                        .setKafkaPartition(tp.partition())
                        .build());
            scopeWithGroupTopicPartition.counter(MetricNames.OFFSET_COMMIT_SUCCESS).inc(1);
            scopeWithGroupTopicPartition.gauge(MetricNames.OFFSET).update(offsetMeta.offset());
          });
    } else {
      offsets.forEach(
          (tp, offsetMeta) -> {
            LOGGER.error(
                "failed to commit offset",
                StructuredLogging.kafkaGroup(consumerGroup),
                StructuredLogging.kafkaTopic(tp.topic()),
                StructuredLogging.kafkaPartition(tp.partition()),
                StructuredLogging.kafkaOffset(offsetMeta.offset()),
                exception.get());
            final Scope scopeWithGroupTopicPartition =
                scope.tagged(
                    StructuredTags.builder()
                        .setKafkaGroup(consumerGroup)
                        .setKafkaTopic(tp.topic())
                        .setKafkaPartition(tp.partition())
                        .build());
            scopeWithGroupTopicPartition.counter(MetricNames.OFFSET_COMMIT_FAILURE).inc(1);
          });
    }
  }