public static ProcessorMessage of()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorMessage.java [173:228]


  public static ProcessorMessage of(
      ConsumerRecord<byte[], byte[]> consumerRecord, Job job, CoreInfra infra, MessageStub stub)
      throws Exception {
    Optional<Span> span;
    if (consumerRecord instanceof TracedConsumerRecord) {
      span = ((TracedConsumerRecord) consumerRecord).span();
    } else {
      span = Optional.empty();
    }

    if (RetryUtils.isRetryTopic(consumerRecord.topic(), job)
        || RetryUtils.isDLQTopic(consumerRecord.topic(), job)
        || RetryUtils.isResqTopic(consumerRecord.topic(), job)) {

      DLQMetadata dlqMetadata = DLQMetadata.parseFrom(consumerRecord.key());
      return new ProcessorMessage(
          dlqMetadata.getData().toByteArray(),
          consumerRecord.value(),
          consumerRecord.headers(),
          consumerRecord.topic(),
          job.getKafkaConsumerTask().getCluster(),
          consumerRecord.partition(),
          consumerRecord.offset(),
          consumerRecord.timestamp(),
          job.getKafkaConsumerTask().getConsumerGroup(),
          dlqMetadata.getTopic(),
          dlqMetadata.getPartition(),
          dlqMetadata.getOffset(),
          dlqMetadata.getTimestampNs(),
          dlqMetadata.getRetryCount(),
          dlqMetadata.getTimeoutCount(),
          span,
          infra,
          stub);
    }

    return new ProcessorMessage(
        consumerRecord.key(),
        consumerRecord.value(),
        consumerRecord.headers(),
        consumerRecord.topic(),
        job.getKafkaConsumerTask().getCluster(),
        consumerRecord.partition(),
        consumerRecord.offset(),
        consumerRecord.timestamp(),
        job.getKafkaConsumerTask().getConsumerGroup(),
        consumerRecord.topic(),
        consumerRecord.partition(),
        consumerRecord.offset(),
        consumerRecord.timestamp(),
        0,
        0,
        span,
        infra,
        stub);
  }