public void pausedPartitionsAndRecords()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/KafkaDelayProcessManager.java [64:112]


  public void pausedPartitionsAndRecords(
      TopicPartition tp, List<ConsumerRecord<K, V>> unprocessedRecords) {
    if (unprocessedRecords.size() == 0) {
      LOGGER.error(
          "The unprocessedRecords should not be empty",
          StructuredLogging.kafkaGroup(consumerGroup),
          StructuredLogging.kafkaTopic(tp.topic()),
          StructuredLogging.kafkaPartition(tp.partition()));
      return;
    }
    if (delayedRecords.containsKey(tp) && !delayedRecords.get(tp).isEmpty()) {
      LOGGER.error(
          String.format(
              "The topic partition is already in delayedRecords with %s records",
              delayedRecords.get(tp).size()),
          StructuredLogging.kafkaGroup(consumerGroup),
          StructuredLogging.kafkaTopic(tp.topic()),
          StructuredLogging.kafkaPartition(tp.partition()));
      scope
          .tagged(
              StructuredTags.builder()
                  .setKafkaGroup(consumerGroup)
                  .setKafkaTopic(tp.topic())
                  .setKafkaPartition(tp.partition())
                  .build())
          .counter(MetricNames.TOPIC_PARTITION_REPEAT)
          .inc(1);
      Preconditions.checkState(false, "The topic partition %s is already in delayedRecords", tp);
    }

    kafkaConsumer.pause(Collections.singleton(tp));
    delayedRecords.put(tp, unprocessedRecords);

    LOGGER.info(
        "kafka.pause",
        StructuredLogging.kafkaGroup(consumerGroup),
        StructuredLogging.kafkaTopic(tp.topic()),
        StructuredLogging.kafkaPartition(tp.partition()),
        StructuredLogging.reason(REASON));
    scope
        .tagged(
            StructuredTags.builder()
                .setKafkaGroup(consumerGroup)
                .setKafkaTopic(tp.topic())
                .setKafkaPartition(tp.partition())
                .build())
        .gauge(MetricNames.TOPIC_PARTITION_PAUSED)
        .update(1);
  }