in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/common/TracedConsumerRecord.java [98:136]
protected Optional<Span> tryCreateSpan(Tracer tracer, String consumerGroup) {
final String topic = topic();
final Headers headers = headers();
if (tracer == null || headers == null) {
return Optional.empty();
}
Tracer.SpanBuilder spanBuilder =
tracer
.buildSpan(OPERATION_NAME)
.withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_CONSUMER)
.withTag(TAG_CONSUMER_GROUP, consumerGroup)
.withTag(TAG_TOPIC, topic);
Map<String, Object> fields = Collections.emptyMap();
try {
SpanContext spanContext =
tracer.extract(Format.Builtin.TEXT_MAP, new HeadersMapExtractAdapter(headers));
if (spanContext != null) {
spanBuilder.asChildOf(spanContext);
} else {
return Optional.empty();
}
} catch (Throwable throwable) {
spanBuilder = spanBuilder.withTag(Tags.ERROR, Boolean.TRUE);
fields =
ImmutableMap.of(
Fields.ERROR_OBJECT,
new RuntimeException("parent span context extract failed", throwable));
}
Span span = spanBuilder.start();
if (!fields.isEmpty()) {
span.log(fields);
}
SpanContext spanContext = span.context();
tracer.inject(spanContext, Format.Builtin.TEXT_MAP, new HeadersMapInjectAdapter(headers));
return Optional.of(span);
}