exporter/kafkaexporter/internal/marshaler/jaeger_marshaler.go (49 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package marshaler // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/marshaler" import ( "bytes" "github.com/gogo/protobuf/jsonpb" jaegerproto "github.com/jaegertracing/jaeger-idl/model/v1" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/multierr" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" ) var ( _ TracesMarshaler = JaegerProtoSpanMarshaler{} _ TracesMarshaler = JaegerJSONSpanMarshaler{} ) type JaegerProtoSpanMarshaler struct{} type JaegerJSONSpanMarshaler struct{} func (JaegerProtoSpanMarshaler) MarshalTraces(traces ptrace.Traces) ([]Message, error) { return marshalJaeger(traces, marshalJaegerSpanProto) } func (JaegerJSONSpanMarshaler) MarshalTraces(traces ptrace.Traces) ([]Message, error) { return marshalJaeger(traces, marshalJaegerSpanJSON) } func marshalJaeger(traces ptrace.Traces, marshal marshalJaegerSpanFunc) ([]Message, error) { batches := jaeger.ProtoFromTraces(traces) var messages []Message var errs error for _, batch := range batches { for _, span := range batch.Spans { span.Process = batch.Process bts, err := marshal(span) // continue to process spans that can be serialized if err != nil { errs = multierr.Append(errs, err) continue } key := []byte(span.TraceID.String()) messages = append(messages, Message{Key: key, Value: bts}) } } return messages, errs } type marshalJaegerSpanFunc func(*jaegerproto.Span) ([]byte, error) func marshalJaegerSpanProto(span *jaegerproto.Span) ([]byte, error) { return span.Marshal() } func marshalJaegerSpanJSON(span *jaegerproto.Span) ([]byte, error) { var m jsonpb.Marshaler out := new(bytes.Buffer) err := m.Marshal(out, span) return out.Bytes(), err }