public CompletionStage submit()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/dispatchers/kafka/KafkaDispatcher.java [76:125]


  public CompletionStage<Void> submit(ItemAndJob<ProducerRecord<K, V>> request) {
    final Job job = request.getJob();
    final ProducerRecord<K, V> record = request.getItem();
    String topic = record.topic();
    String cluster = JobUtils.getKafkaProducerCluster(job);
    Scope requestScope =
        infra
            .scope()
            .tagged(StructuredTags.builder().setKafkaTopic(topic).setKafkaCluster(cluster).build());
    CompletableFuture<Void> future = new CompletableFuture<>();
    Stopwatch stopwatch = requestScope.timer(MetricNames.SEND_LATENCY).start();

    kafkaProducer.send(
        record,
        (recordMetadata, e) -> {
          stopwatch.stop();
          if (e != null) {
            if (e instanceof RecordTooLargeException) {
              String topicPrefix =
                  "hp-dbevents-schemaless-jobstore-orders-ENTITY_FARES".toLowerCase();
              if (topic.toLowerCase().startsWith(topicPrefix)) {
                requestScope.counter(MetricNames.SKIPPED_LARGE_MESSAGE).inc(1);
                future.complete(null);
                return;
              }
            }
            requestScope
                .tagged(ImmutableMap.of(Tags.Key.code, e.getClass().getSimpleName()))
                .counter(MetricNames.SEND)
                .inc(1);
            LOGGER.error(
                "dispatcher.kafka.send.failure",
                StructuredLogging.kafkaTopic(topic),
                StructuredLogging.kafkaCluster(cluster),
                e);
            future.completeExceptionally(e);
          } else {
            requestScope
                .tagged(ImmutableMap.of(Tags.Key.code, "ok"))
                .counter(MetricNames.SEND)
                .inc(1);
            LOGGER.debug(
                "dispatcher.kafka.send.success",
                StructuredLogging.kafkaTopic(topic),
                StructuredLogging.kafkaCluster(cluster));
            future.complete(null);
          }
        });
    return future;
  }