void processFetchedData()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [1011:1129]


  void processFetchedData(
      @Nullable ConsumerRecords<K, V> consumerRecords, Map<TopicPartition, Job> taskMap)
      throws InterruptedException {
    Preconditions.checkNotNull(pipelineStateManager, "pipeline config manager required");
    Preconditions.checkNotNull(processorSink, "sink required");
    if (taskMap.size() == 0 || consumerRecords == null || consumerRecords.isEmpty()) {
      return;
    }

    Set<TopicPartition> partitions = consumerRecords.partitions();
    for (TopicPartition tp : partitions) {
      List<ConsumerRecord<K, V>> records = consumerRecords.records(tp);
      Job job = taskMap.get(tp);

      if (records.size() != 0) {
        // make sure the job is not null
        if (job == null) {
          pausePartitionAndSeekOffset(tp, records.get(0).offset());
          continue;
        }

        final int size = records.size();
        long lastProcessedOffset = records.get(size - 1).offset();
        for (int i = 0; i < size; ++i) {
          ConsumerRecord<K, V> record = records.get(i);
          // FIXME: this is a workaround for retry queue duplication problem.
          // https://t3.uberinternal.com/browse/KAFEP-1522
          // long term solution is to have a separate thread for periodic commit offset, however, it
          // is a complicated change.
          // add maybeCommitOffsets method in here to make offset commit more frequent so that fewer
          // messages get reprocessed when worker shuffle happened
          if (perRecordCommit
              && (lastCommitOffsetsCheckTimeMs == -1L
                  || (System.currentTimeMillis() - lastCommitOffsetsCheckTimeMs
                      > ACTIVE_COMMIT_INTERVAL_IN_MS))) {
            commitOffsets(taskMap);
          }
          if (handleEndOffsetAndDelay(record, job, checkpointManager, pipelineStateManager)) {
            lastProcessedOffset = record.offset() - 1;
            pausePartitionAndSeekOffset(tp, record.offset());
            break;
          }
          if (delayProcessManager.shouldDelayProcess(record)) {
            delayProcessManager.pausedPartitionsAndRecords(tp, records.subList(i, size));
            lastProcessedOffset = record.offset() - 1;
            break;
          }

          scope
              .tagged(
                  StructuredTags.builder()
                      .setKafkaGroup(
                          pipelineStateManager
                              .getJobTemplate()
                              .getKafkaConsumerTask()
                              .getConsumerGroup())
                      .setKafkaTopic(tp.topic())
                      .setKafkaPartition(tp.partition())
                      .build())
              .gauge(MetricNames.TOPIC_PARTITION_DELAY_TIME)
              .update(System.currentTimeMillis() - record.timestamp());

          inflightMessageTracker.addMessage(tp, record.serializedValueSize());
          // process the message
          final Scope scopeWithGroupTopicPartition =
              scope.tagged(
                  StructuredTags.builder()
                      .setKafkaGroup(job.getKafkaConsumerTask().getConsumerGroup())
                      .setKafkaTopic(record.topic())
                      .setKafkaPartition(record.partition())
                      .build());
          final Timer messageProcessTimer =
              scopeWithGroupTopicPartition.timer(MetricNames.MESSAGE_PROCESS_LATENCY);
          long startNanoTime = System.nanoTime();
          TracedConsumerRecord<K, V> tracedRecord =
              TracedConsumerRecord.of(
                  record, infra.tracer(), job.getKafkaConsumerTask().getConsumerGroup());
          processorSink
              .submit(ItemAndJob.of(tracedRecord, job))
              .toCompletableFuture()
              .whenComplete(
                  (aLong, throwable) -> {
                    messageProcessTimer.record(Duration.between(startNanoTime, System.nanoTime()));
                    if (throwable != null) {
                      // because the fetcher side does not have a retry mechanism, the processor
                      // should handle all errors and never return an error to the fetcher
                      LOGGER.error(
                          "failed to process a record",
                          StructuredLogging.jobId(job.getJobId()),
                          StructuredLogging.kafkaGroup(
                              job.getKafkaConsumerTask().getConsumerGroup()),
                          StructuredLogging.kafkaTopic(record.topic()),
                          StructuredLogging.kafkaPartition(record.partition()),
                          StructuredLogging.kafkaOffset(record.offset()),
                          throwable);
                      scopeWithGroupTopicPartition
                          .counter(MetricNames.MESSAGE_PROCESS_FAILURE)
                          .inc(1);
                    } else {
                      // this setOffsetToCommit will keep the largest offset to commit.
                      checkpointManager.setOffsetToCommit(job, aLong);
                      // update throughput
                      throughputTracker.record(job, 1, record.serializedValueSize());
                    }
                    inflightMessageTracker.removeMessage(tp, record.serializedValueSize());
                    tracedRecord.complete(aLong, throwable);
                  });
        }
        // set next fetch lastProcessedOffset after current records enqueue succeed. When fetch
        // request failed,
        // fetcher will reset to fetchOffset to avoid data loss.
        checkpointManager.setFetchOffset(job, lastProcessedOffset + 1);
      } else {
        if (job == null) {
          pausePartitionAndSeekOffset(tp, -1L);
        }
      }
    }
  }