kafka-connect-bigtable-sink/doc/performance/MessageTracer.java (60 lines of code) (raw):
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.kafka.connect.bigtable.tracing;
import com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import java.util.Collections;
import java.util.Optional;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageTracer {
private static final String KAFKA_HEADER_NAME = "traceparent";
private static final Logger logger = LoggerFactory.getLogger(MessageTracer.class);
private static final OpenTelemetry otel = GlobalOpenTelemetry.get();
private static Context extractParentContext(SinkRecord record) {
Object traceparentHeaderValue =
Optional.ofNullable(record)
.map(SinkRecord::headers)
.map(hs -> hs.lastWithName(KAFKA_HEADER_NAME))
.map(Header::value)
.orElse(null);
String traceparent;
if (traceparentHeaderValue instanceof String) {
traceparent = (String) traceparentHeaderValue;
} else {
logger.warn(
"Parent not found for '{}' header value '{}'", KAFKA_HEADER_NAME, traceparentHeaderValue);
return null;
}
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/discussions/4546#discussioncomment-1572327
return W3CTraceContextPropagator.getInstance()
.extract(
Context.root(),
traceparent,
new TextMapGetter<>() {
@Override
public Iterable<String> keys(String carrier) {
return Collections.singleton(KAFKA_HEADER_NAME);
}
@Override
public String get(String carrier, String key) {
return key.equals(KAFKA_HEADER_NAME) ? carrier : null;
}
});
}
public static Span getRecordSpan(SinkRecord record, String spanName) {
Tracer tracer = otel.getTracer(BigtableSinkConnector.class.getName());
SpanBuilder spanBuilder = tracer.spanBuilder(spanName);
Context parentContext = extractParentContext(record);
if (parentContext != null) {
spanBuilder.setParent(parentContext);
}
return spanBuilder.startSpan();
}
}