public CompletionStage submit()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorImpl.java [1003:1166]


  public CompletionStage<Long> submit(ItemAndJob<ConsumerRecord<byte[], byte[]>> request) {
    final Job job = request.getJob();
    final ConsumerRecord<byte[], byte[]> consumerRecord = request.getItem();
    final StructuredArgument[] logTags =
        new StructuredArgument[] {
          StructuredArguments.keyValue(StructuredFields.URI, addressFromUri),
          StructuredArguments.keyValue(
              StructuredFields.KAFKA_GROUP, job.getKafkaConsumerTask().getConsumerGroup()),
          StructuredArguments.keyValue(
              StructuredFields.KAFKA_CLUSTER, job.getKafkaConsumerTask().getCluster()),
          StructuredArguments.keyValue(
              StructuredFields.KAFKA_TOPIC, job.getKafkaConsumerTask().getTopic()),
          StructuredArguments.keyValue(
              StructuredFields.KAFKA_PARTITION,
              Integer.toString(job.getKafkaConsumerTask().getPartition()))
        };
    Map<String, String> metricsTags = getMetricsTags(job);
    final String[] tags =
        metricsTags
            .entrySet()
            .stream()
            .flatMap(e -> Stream.of(e.getKey(), e.getValue()))
            .toArray(String[]::new);

    infra.contextManager().createRootContext();

    // TODO (gteo): cleanup / deprecate processing scope
    Scope processingScope = infra.scope().tagged(metricsTags);

    // use completable future chaining so that any exception thrown by an earlier stage
    // will automatically terminate the chain.
    final Stopwatch prefetchLatency =
        processingScope.timer("processor.prefetch.overall.latency").start();
    return Instrumentation.instrument
        .withExceptionalCompletion(
            LOGGER,
            infra.scope(),
            // Create a new processor message
            // DirectSupplier creates it on this thread.
            () ->
                DirectSupplier.supply(
                    () -> {
                      ProcessorMessage processorMessage =
                          ProcessorMessage.of(
                              consumerRecord,
                              job,
                              infra,
                              new MessageStub(logTags, processingScope));
                      acquireQuota(job, processorMessage);
                      return processorMessage;
                    }),
            "processor.prefetch.new-message",
            tags)
        .thenApply(
            pm ->
                Instrumentation.instrument.withRuntimeException(
                    LOGGER,
                    infra.scope(),
                    // Enqueue this to the ack manager, which acts as a prefetch buffer and
                    // out of order commit tracker.
                    () -> {
                      ackManager.receive(pm);
                      return pm;
                    },
                    "processor.prefetch.ack-manager.receive",
                    tags))
        .thenApply(
            pm ->
                Instrumentation.instrument.withRuntimeException(
                    LOGGER,
                    infra.scope(),
                    () -> {
                      if (!messageFilter.shouldProcess(ItemAndJob.of(pm, job))) {
                        pm.setOffsetToCommit(ackManager.ack(pm));
                        pm.setShouldDispatch(false);

                        processingScope
                            .counter("processor.prefetch.filter.cluster.filtered")
                            .inc(1);
                        LOGGER.debug("processor.prefetch.filter.cluster.filtered", logTags);
                      }
                      return pm;
                    },
                    "processor.prefetch.filter.cluster",
                    tags))
        .thenApplyAsync(
            // From here onwards, we enter the dispatch phase, where we send messages via gRPC or to
            // DLQ.
            // The dispatch phases uses separate executor pool so that we can send multiple messages
            // concurrently.
            pm -> pm,
            executor)
        .whenComplete(
            (r, t) -> {
              // metrics and log overall prefetch stage.
              prefetchLatency.stop();
              if (t != null) {
                processingScope
                    .tagged(ImmutableMap.of(Tags.Key.code, t.getClass().getSimpleName()))
                    .counter("processor.prefetch.overall")
                    .inc(1);
              } else {
                processingScope
                    .tagged(ImmutableMap.of(Tags.Key.code, "ok"))
                    .counter("processor.prefetch.overall")
                    .inc(1);
              }
            })
        .thenCompose(
            pm ->
                infra
                    .contextManager()
                    .runAsync(infra.contextManager()::createRootContext, executor)
                    .thenApply(ignoredVoid -> pm))
        .thenCompose(
            pm ->
                supplyAsyncIfDispatchable(
                    pm,
                    () ->
                        Instrumentation.instrument.withExceptionalCompletion(
                            LOGGER,
                            infra.scope(),
                            () -> {
                              // Forward the messages to the dispatchers
                              // This handles the actual transmission to callee via gRPC
                              // and any follow up transmission to Kafka for DLQ.
                              return Failsafe.with(
                                      getKafkaProducerAsyncFallbackPolicy(
                                          ItemAndJob.of(pm, job), processingScope))
                                  .with(Scheduler.of(executor))
                                  .getStageAsync(
                                      () ->
                                          pm.getStub()
                                              .withRetryFuture(
                                                  Failsafe.with(
                                                          getGrpcRetryPolicy(job, processingScope))
                                                      .with(Scheduler.of(executor))
                                                      .getStageAsync(
                                                          infra
                                                              .contextManager()
                                                              .wrap(
                                                                  dispatchToGrpcOutbound(
                                                                      ItemAndJob.of(pm, job),
                                                                      tags)))))
                                  .whenComplete(
                                      (r, t) -> {
                                        try {
                                          pm.close(r, t);
                                        } catch (Exception e) {
                                          LOGGER.error("failed to close processor message", e);
                                        }
                                      })
                                  .thenApply(
                                      r -> {
                                        pm.setOffsetToCommit(r.getOffset());
                                        return pm;
                                      });
                            },
                            "processor.dispatch",
                            tags)))
        .thenApply(
            // Convert response to commit offsets.
            r -> r.getOffsetToCommit());
  }