public void postProcess()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/BatchRpcUriRebalancer.java [150:238]


  public void postProcess(
      Map<String, RebalancingJobGroup> jobGroups, Map<Long, StoredWorker> workers) {
    // Dynamic config for turning it off. Although from the comments below we know it's safe to turn
    // off, we want to have a way for quick mitigation, if anything happens in production.
    if (!dynamicConfiguration.isOffsetCommittingEnabled()) {
      return;
    }
    Stopwatch postProcessStopwatch = scope.timer(MetricNames.POST_PROCESS_LATENCY).start();
    try {
      long currentTimeMs = System.currentTimeMillis();
      for (RebalancingJobGroup rebalancingJobGroup : jobGroups.values()) {
        Timestamp endTimestamp =
            rebalancingJobGroup.getJobGroup().getKafkaConsumerTaskGroup().getEndTimestamp();
        long endTimestampMs = Timestamps.toMillis(endTimestamp);
        // try to keep committing offsets for a certain time period so that it can eventually
        // succeed.
        // (1) we keep committing for a certain time period because the committed offset might be
        //     override by another purge/merge operation, which will eventually be deleted.
        // (2) we don't keep committing forever because
        //     (a) messages will be purged after a certain time period, we don't need to do it
        // anymore
        //     (b) there might be too many committing work, which might take unacceptable long time.
        if (currentTimeMs - endTimestampMs > OFFSET_COMMIT_SKEW_MS) {
          continue;
        }
        // TODO(qichao): https://t3.uberinternal.com/browse/KAFEP-1263
        // The following code for committing the offset is no longer needed due to
        // recent fixes in KCP DLQ purge. KCP now uses worker to commit the offset.
        String topic = rebalancingJobGroup.getJobGroup().getKafkaConsumerTaskGroup().getTopic();
        Map<TopicPartition, OffsetAndMetadata> partitionAndOffsetToCommit = new HashMap<>();
        for (StoredJob job : rebalancingJobGroup.getJobs().values()) {
          long startOffset = job.getJob().getKafkaConsumerTask().getStartOffset();
          long endOffset = job.getJob().getKafkaConsumerTask().getEndOffset();
          if (endOffset > 0
              && startOffset == endOffset
              && job.getState() == JobState.JOB_STATE_CANCELED) {
            partitionAndOffsetToCommit.put(
                new TopicPartition(topic, job.getJob().getKafkaConsumerTask().getPartition()),
                new OffsetAndMetadata(endOffset));
          }
        }
        // commit offsets to kafka clusters
        if (!partitionAndOffsetToCommit.isEmpty()) {
          try {
            Stopwatch stopwatch = scope.timer(MetricNames.OFFSET_COMMIT_LATENCY).start();
            KafkaConsumerTaskGroup taskGroup =
                rebalancingJobGroup.getJobGroup().getKafkaConsumerTaskGroup();
            AdminClient client = adminBuilder.build(taskGroup.getCluster());
            client.alterConsumerGroupOffsets(
                taskGroup.getConsumerGroup(), partitionAndOffsetToCommit);
            stopwatch.stop();
          } catch (Exception e) {
            // failed to commit offset might lead to wrong dlq lag reports.
            // we need to add metrics and alerts.
            logger.warn(
                MetricNames.OFFSET_COMMIT_FAILURE,
                StructuredLogging.kafkaCluster(
                    rebalancingJobGroup.getJobGroup().getKafkaConsumerTaskGroup().getCluster()),
                StructuredLogging.kafkaGroup(
                    rebalancingJobGroup
                        .getJobGroup()
                        .getKafkaConsumerTaskGroup()
                        .getConsumerGroup()),
                StructuredLogging.kafkaTopic(topic),
                e);
            scope
                .tagged(
                    StructuredTags.builder()
                        .setKafkaCluster(
                            rebalancingJobGroup
                                .getJobGroup()
                                .getKafkaConsumerTaskGroup()
                                .getCluster())
                        .setKafkaGroup(
                            rebalancingJobGroup
                                .getJobGroup()
                                .getKafkaConsumerTaskGroup()
                                .getConsumerGroup())
                        .setKafkaTopic(topic)
                        .build())
                .counter(MetricNames.OFFSET_COMMIT_FAILURE)
                .inc(1);
          }
        }
      }
    } finally {
      postProcessStopwatch.stop();
    }
  }