receiver/kafkareceiver/internal/unmarshaler/text_logs_unmarshaler.go (34 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package unmarshaler // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/unmarshaler" import ( "errors" "time" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "golang.org/x/text/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/textutils" ) var _ plog.Unmarshaler = (*TextLogsUnmarshaler)(nil) type TextLogsUnmarshaler struct { decoder *encoding.Decoder } func NewTextLogsUnmarshaler(encodingName string) (*TextLogsUnmarshaler, error) { encoding, err := textutils.LookupEncoding(encodingName) if err != nil { return nil, err } return &TextLogsUnmarshaler{decoder: encoding.NewDecoder()}, nil } func (r *TextLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) { if r.decoder == nil { return plog.Logs{}, errors.New("encoding not set") } p := plog.NewLogs() decoded, err := textutils.DecodeAsString(r.decoder, buf) if err != nil { return p, err } l := p.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() l.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) l.Body().SetStr(decoded) return p, nil }