exporter/kafkaexporter/internal/marshaler/raw_marshaler.go (61 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package marshaler // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/marshaler" import ( "encoding/json" "errors" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" ) var ( errUnsupported = errors.New("unsupported serialization") _ LogsMarshaler = RawLogsMarshaler{} ) type RawLogsMarshaler struct{} func (r RawLogsMarshaler) MarshalLogs(logs plog.Logs) ([]Message, error) { var messages []Message for i := 0; i < logs.ResourceLogs().Len(); i++ { rl := logs.ResourceLogs().At(i) for j := 0; j < rl.ScopeLogs().Len(); j++ { sl := rl.ScopeLogs().At(j) for k := 0; k < sl.LogRecords().Len(); k++ { lr := sl.LogRecords().At(k) b, err := r.logBodyAsBytes(lr.Body()) if err != nil { return nil, err } if len(b) == 0 { continue } messages = append(messages, Message{Value: b}) } } } return messages, nil } func (r RawLogsMarshaler) logBodyAsBytes(value pcommon.Value) ([]byte, error) { switch value.Type() { case pcommon.ValueTypeStr: return r.interfaceAsBytes(value.Str()) case pcommon.ValueTypeBytes: return value.Bytes().AsRaw(), nil case pcommon.ValueTypeBool: return r.interfaceAsBytes(value.Bool()) case pcommon.ValueTypeDouble: return r.interfaceAsBytes(value.Double()) case pcommon.ValueTypeInt: return r.interfaceAsBytes(value.Int()) case pcommon.ValueTypeEmpty: return []byte{}, nil case pcommon.ValueTypeSlice: return r.interfaceAsBytes(value.Slice().AsRaw()) case pcommon.ValueTypeMap: return r.interfaceAsBytes(value.Map().AsRaw()) default: return nil, errUnsupported } } func (r RawLogsMarshaler) interfaceAsBytes(value any) ([]byte, error) { if value == nil { return []byte{}, nil } return json.Marshal(value) }