in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorMessage.java [173:228]
public static ProcessorMessage of(
ConsumerRecord<byte[], byte[]> consumerRecord, Job job, CoreInfra infra, MessageStub stub)
throws Exception {
Optional<Span> span;
if (consumerRecord instanceof TracedConsumerRecord) {
span = ((TracedConsumerRecord) consumerRecord).span();
} else {
span = Optional.empty();
}
if (RetryUtils.isRetryTopic(consumerRecord.topic(), job)
|| RetryUtils.isDLQTopic(consumerRecord.topic(), job)
|| RetryUtils.isResqTopic(consumerRecord.topic(), job)) {
DLQMetadata dlqMetadata = DLQMetadata.parseFrom(consumerRecord.key());
return new ProcessorMessage(
dlqMetadata.getData().toByteArray(),
consumerRecord.value(),
consumerRecord.headers(),
consumerRecord.topic(),
job.getKafkaConsumerTask().getCluster(),
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.timestamp(),
job.getKafkaConsumerTask().getConsumerGroup(),
dlqMetadata.getTopic(),
dlqMetadata.getPartition(),
dlqMetadata.getOffset(),
dlqMetadata.getTimestampNs(),
dlqMetadata.getRetryCount(),
dlqMetadata.getTimeoutCount(),
span,
infra,
stub);
}
return new ProcessorMessage(
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.headers(),
consumerRecord.topic(),
job.getKafkaConsumerTask().getCluster(),
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.timestamp(),
job.getKafkaConsumerTask().getConsumerGroup(),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.timestamp(),
0,
0,
span,
infra,
stub);
}