v2/otel/otel.go (89 lines of code) (raw):
package otel
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
// Extract extracts the remote trace context from the message if exists, and set it onto ctx
func Extract(ctx context.Context, message *azservicebus.ReceivedMessage) context.Context {
if message != nil {
ctx, _ = getRemoteParentSpan(ctx, message)
}
return ctx
}
func MessageAttributes(message *azservicebus.ReceivedMessage) []attribute.KeyValue {
var attrs []attribute.KeyValue
if message != nil {
attrs = append(attrs, attribute.String("message.id", message.MessageID))
if message.CorrelationID != nil {
attrs = append(attrs, attribute.String("message.correlationId", *message.CorrelationID))
}
if message.ScheduledEnqueueTime != nil {
attrs = append(attrs, attribute.String("message.scheduledEnqueuedTime", message.ScheduledEnqueueTime.String()))
}
if message.TimeToLive != nil {
attrs = append(attrs, attribute.String("message.ttl", message.TimeToLive.String()))
}
}
return attrs
}
// This method will extract the remote trace context from the traceCarrier if exists,
// and inject it into the passed in context. If the traceCarrier doesn't contain a
// valid trace context, the passed in context will be returned as is.
// the remote span is returned for UT purpose only
func getRemoteParentSpan(ctx context.Context, message *azservicebus.ReceivedMessage) (context.Context, trace.Span) {
propogator := propagation.TraceContext{}
ctx = propogator.Extract(ctx, ReceivedMessageCarrierAdapter(message))
return ctx, trace.SpanFromContext(ctx)
}
// the implementaion of TextMapCarrier interface for receiver side
type receivedMessageWrapper struct {
message *azservicebus.ReceivedMessage
}
// ReceivedMessageCarrierAdapter wraps a servicebus ReceivedMessage so that it implements the TextMapCarrier interface
func ReceivedMessageCarrierAdapter(message *azservicebus.ReceivedMessage) propagation.TextMapCarrier {
return &receivedMessageWrapper{message: message}
}
func (mw *receivedMessageWrapper) Set(key string, value string) {
if mw.message.ApplicationProperties == nil {
mw.message.ApplicationProperties = make(map[string]interface{})
}
mw.message.ApplicationProperties[key] = value
}
func (mw *receivedMessageWrapper) Get(key string) string {
if mw.message.ApplicationProperties == nil || mw.message.ApplicationProperties[key] == nil {
return ""
}
return mw.message.ApplicationProperties[key].(string)
}
func (mw *receivedMessageWrapper) Keys() []string {
keys := make([]string, 0, len(mw.message.ApplicationProperties))
for k := range mw.message.ApplicationProperties {
keys = append(keys, k)
}
return keys
}
func Inject(ctx context.Context, msg *azservicebus.Message) {
propogator := propagation.TraceContext{}
propogator.Inject(ctx, MessageCarrierAdapter(msg))
}
// the implementaion of TextMapCarrier interface for the sender side
type messageWrapper struct {
message *azservicebus.Message
}
// MessageCarrierAdapter wraps a azservicebus.Message so that it implements the propagation.TextMapCarrier interface
func MessageCarrierAdapter(message *azservicebus.Message) propagation.TextMapCarrier {
return &messageWrapper{message: message}
}
func (mw *messageWrapper) Set(key string, value string) {
if mw.message.ApplicationProperties == nil {
mw.message.ApplicationProperties = make(map[string]interface{})
}
mw.message.ApplicationProperties[key] = value
}
func (mw *messageWrapper) Get(key string) string {
if mw.message.ApplicationProperties == nil || mw.message.ApplicationProperties[key] == nil {
return ""
}
return mw.message.ApplicationProperties[key].(string)
}
func (mw *messageWrapper) Keys() []string {
keys := make([]string, 0, len(mw.message.ApplicationProperties))
for k := range mw.message.ApplicationProperties {
keys = append(keys, k)
}
return keys
}