public boolean handleEndOffsetAndDelay()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/fetcher/RetryTopicKafkaFetcher.java [125:190]


  public boolean handleEndOffsetAndDelay(
      ConsumerRecord<byte[], byte[]> consumerRecord,
      Job job,
      CheckpointManager checkpointManager,
      PipelineStateManager pipelineStateManager)
      throws InterruptedException {
    // a boolean indicating whether the caller needs to process the remaining messages for the
    // given job or not. True means the caller does not need to, false means otherwise.
    boolean stopProcessingRemaining = false;
    // If the job is no longer assigned to this pipeline, there
    // is no need to process the remaining records.
    if (!pipelineStateManager.shouldJobBeRunning(job)) {
      stopProcessingRemaining = true;
    } else {
      // handle the processing delay
      long processingDelayMs = getProcessingDelayMs(job);
      if (processingDelayMs > 0) {
        long deadline = consumerRecord.timestamp() + processingDelayMs;
        long waitingTimeMs = deadline - System.currentTimeMillis();
        KafkaConsumerTask kafkaConsumerTask = job.getKafkaConsumerTask();
        // wait until the expected delay is reached or the job was removed.
        while (waitingTimeMs > 0 && pipelineStateManager.shouldJobBeRunning(job)) {
          // log exceptionally long (>6H) waiting
          if (waitingTimeMs > 21600000) {
            LOGGER.info(
                "long-waiting-messages",
                StructuredLogging.kafkaCluster(kafkaConsumerTask.getCluster()),
                StructuredLogging.kafkaGroup(kafkaConsumerTask.getConsumerGroup()),
                StructuredLogging.kafkaTopic(kafkaConsumerTask.getTopic()),
                StructuredLogging.kafkaPartition(kafkaConsumerTask.getPartition()),
                StructuredLogging.kafkaOffset(consumerRecord.offset()));
          }

          final long finalWaitingTimeMs = waitingTimeMs;
          Instrumentation.instrument.withException(
              LOGGER,
              infra.scope(),
              infra.tracer(),
              () -> {
                lock.lock();
                try {
                  return processingDelayReached.await(finalWaitingTimeMs, TimeUnit.MILLISECONDS);
                } finally {
                  lock.unlock();
                }
              },
              r -> !r,
              "message-processing-wait",
              StructuredTags.KAFKA_CLUSTER,
              kafkaConsumerTask.getCluster(),
              StructuredTags.KAFKA_GROUP,
              kafkaConsumerTask.getConsumerGroup(),
              StructuredTags.KAFKA_TOPIC,
              kafkaConsumerTask.getTopic(),
              StructuredTags.KAFKA_PARTITION,
              Integer.toString(kafkaConsumerTask.getPartition()));
          // update the waitingTimeMs in case await is terminated by signal.
          waitingTimeMs = deadline - System.currentTimeMillis();
        }
        // check again: if the job is no longer assigned to this pipeline, there
        // is no need to process the remaining records.
        stopProcessingRemaining = !pipelineStateManager.shouldJobBeRunning(job);
      }
    }
    return stopProcessingRemaining;
  }