public CompletableFuture submit()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/dispatcher/DispatcherImpl.java [103:169]


  public CompletableFuture<DispatcherResponse> submit(ItemAndJob<DispatcherMessage> item) {
    final Job job = item.getJob();
    final DispatcherMessage message = item.getItem();
    // Only emit dispatch by type metric b/c GrpcDispatcher and KafkaDispatcher already emit fine
    // grained success/failure/latency metrics.
    final String rpcUri = job.getRpcDispatcherTask().getUri();
    final String group = job.getKafkaConsumerTask().getConsumerGroup();
    final String topic = job.getKafkaConsumerTask().getTopic();
    final int partition = job.getKafkaConsumerTask().getPartition();
    LOGGER.debug(
        MetricNames.DISPATCH,
        StructuredLogging.dispatcher(message.getType().toString()),
        StructuredLogging.rpcRoutingKey(rpcUri),
        StructuredLogging.kafkaGroup(group),
        StructuredLogging.kafkaTopic(topic),
        StructuredLogging.kafkaPartition(partition),
        StructuredLogging.destination(message.getDestination()));
    infra
        .scope()
        .tagged(
            StructuredTags.builder()
                .setMode(message.getType().toString())
                .setDestination(rpcUri)
                .setKafkaGroup(group)
                .setKafkaTopic(topic)
                .setKafkaPartition(partition)
                .build())
        .counter(MetricNames.DISPATCH)
        .inc(1);
    switch (message.getType()) {
      case GRPC:
        return grpcDispatcher
            .submit(ItemAndJob.of(message.getGrpcMessage(), job))
            // wrap KafkaMessageAction from gRPC response in a Dispatcher response
            // exceptional completions are treated as in-memory retry by the processor,
            // which resends the message to the gRPC endpoint.
            .thenApply(DispatcherImpl::dispatcherResponseFromGrpcStatus)
            .toCompletableFuture();
      case KAFKA:
        KafkaDispatcher<byte[], byte[]> kafkaDispatcher;
        String destTopic = message.getDestination();
        if (RetryUtils.isResqTopic(destTopic, job)) {
          if (!resqProducer.isPresent()) {
            CompletableFuture<DispatcherResponse> future = new CompletableFuture<>();
            future.completeExceptionally(
                new IllegalStateException("resilience queue producer is not present"));
            return future;
          }
          kafkaDispatcher = resqProducer.get();
        } else {
          kafkaDispatcher = dlqProducer;
        }
        return kafkaDispatcher
            .submit(ItemAndJob.of(message.getProducerRecord(), job))
            // wrap KafkaMessageAction from gRPC response in a Dispatcher response
            // exceptional completions are treated as in-memory retry by the processor,
            // which resends the message to the Kafka Producer endpoint.
            .thenApply(r -> new DispatcherResponse(DispatcherResponse.Code.COMMIT))
            .toCompletableFuture();
      default:
        CompletableFuture<DispatcherResponse> future = new CompletableFuture<>();
        future.completeExceptionally(
            new IllegalArgumentException(
                String.format("unsupported message type %s", message.getType())));
        return future;
    }
  }