receiver/fluentforwardreceiver/conversion.go (323 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package fluentforwardreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/fluentforwardreceiver" import ( "bytes" "compress/gzip" "errors" "fmt" "io" "time" "github.com/tinylib/msgp/msgp" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" ) const tagAttributeKey = "fluent.tag" // Most of this logic is derived directly from // https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1, // which describes the fields in much greater detail. type Event interface { DecodeMsg(dc *msgp.Reader) error LogRecords() plog.LogRecordSlice Chunk() string Compressed() string } type OptionsMap map[string]any // Chunk returns the `chunk` option or blank string if it was not set. func (om OptionsMap) Chunk() string { c, _ := om["chunk"].(string) return c } func (om OptionsMap) Compressed() string { compressed, _ := om["compressed"].(string) return compressed } type EventMode int type Peeker interface { Peek(n int) ([]byte, error) } // Values for enum EventMode. const ( UnknownMode EventMode = iota MessageMode ForwardMode PackedForwardMode ) func (em EventMode) String() string { switch em { case UnknownMode: return "unknown" case MessageMode: return "message" case ForwardMode: return "forward" case PackedForwardMode: return "packedforward" default: panic("programmer bug") } } // parseInterfaceToMap takes map of interface objects and returns // AttributeValueMap func parseInterfaceToMap(msi map[string]any, dest pcommon.Value) { am := dest.SetEmptyMap() am.EnsureCapacity(len(msi)) for k, value := range msi { parseToAttributeValue(value, am.PutEmpty(k)) } } // parseInterfaceToArray takes array of interface objects and returns // AttributeValueArray func parseInterfaceToArray(ai []any, dest pcommon.Value) { av := dest.SetEmptySlice() av.EnsureCapacity(len(ai)) for _, value := range ai { parseToAttributeValue(value, av.AppendEmpty()) } } // parseToAttributeValue converts interface object to AttributeValue func parseToAttributeValue(val any, dest pcommon.Value) { // See https://github.com/tinylib/msgp/wiki/Type-Mapping-Rules switch r := val.(type) { case bool: dest.SetBool(r) case string: dest.SetStr(r) case uint64: dest.SetInt(int64(r)) case int64: dest.SetInt(r) // Sometimes strings come in as bytes array case []byte: dest.SetStr(string(r)) case map[string]any: parseInterfaceToMap(r, dest) case []any: parseInterfaceToArray(r, dest) case float32: dest.SetDouble(float64(r)) case float64: dest.SetDouble(r) case nil: default: dest.SetStr(fmt.Sprintf("%v", val)) } } func timeFromTimestamp(ts any) (time.Time, error) { switch v := ts.(type) { case uint64: return time.Unix(int64(v), 0), nil case int64: return time.Unix(v, 0), nil case *eventTimeExt: return time.Time(*v), nil default: return time.Time{}, fmt.Errorf("unknown type of value: %v", ts) } } func parseRecordToLogRecord(dc *msgp.Reader, lr plog.LogRecord) error { tsIntf, err := dc.ReadIntf() if err != nil { return msgp.WrapError(err, "Time") } ts, err := timeFromTimestamp(tsIntf) if err != nil { return msgp.WrapError(err, "Time") } lr.SetTimestamp(pcommon.NewTimestampFromTime(ts)) recordLen, err := dc.ReadMapHeader() if err != nil { return msgp.WrapError(err, "Record") } for recordLen > 0 { recordLen-- key, err := dc.ReadString() if err != nil { // The protocol doesn't specify this but apparently some map keys // can be binary type instead of string keyBytes, keyBytesErr := dc.ReadBytes(nil) if keyBytesErr != nil { return msgp.WrapError(keyBytesErr, "Record") } key = string(keyBytes) } val, err := dc.ReadIntf() if err != nil { return msgp.WrapError(err, "Record", key) } // fluentd uses message, fluentbit log. if key == "message" || key == "log" { parseToAttributeValue(val, lr.Body()) } else { parseToAttributeValue(val, lr.Attributes().PutEmpty(key)) } } return nil } type MessageEventLogRecord struct { plog.LogRecordSlice OptionsMap } func (melr *MessageEventLogRecord) LogRecords() plog.LogRecordSlice { return melr.LogRecordSlice } func (melr *MessageEventLogRecord) DecodeMsg(dc *msgp.Reader) error { melr.LogRecordSlice = plog.NewLogRecordSlice() var arrLen uint32 var err error arrLen, err = dc.ReadArrayHeader() if err != nil { return err } if arrLen > 4 || arrLen < 3 { return msgp.ArrayError{Wanted: 3, Got: arrLen} } tag, err := dc.ReadString() if err != nil { return msgp.WrapError(err, "Tag") } log := melr.AppendEmpty() attrs := log.Attributes() attrs.PutStr(tagAttributeKey, tag) err = parseRecordToLogRecord(dc, log) if err != nil { return err } if arrLen == 4 { melr.OptionsMap, err = parseOptions(dc) return err } return nil } func parseOptions(dc *msgp.Reader) (OptionsMap, error) { var optionLen uint32 optionLen, err := dc.ReadMapHeader() if err != nil { return nil, msgp.WrapError(err, "Option") } out := make(OptionsMap, optionLen) for optionLen > 0 { optionLen-- key, err := dc.ReadString() if err != nil { return nil, msgp.WrapError(err, "Option") } val, err := dc.ReadIntf() if err != nil { return nil, msgp.WrapError(err, "Option", key) } out[key] = val } return out, nil } type ForwardEventLogRecords struct { plog.LogRecordSlice OptionsMap } func (fe *ForwardEventLogRecords) LogRecords() plog.LogRecordSlice { return fe.LogRecordSlice } func (fe *ForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) error { fe.LogRecordSlice = plog.NewLogRecordSlice() arrLen, err := dc.ReadArrayHeader() if err != nil { return err } if arrLen < 2 || arrLen > 3 { return msgp.ArrayError{Wanted: 2, Got: arrLen} } tag, err := dc.ReadString() if err != nil { return msgp.WrapError(err, "Tag") } entryLen, err := dc.ReadArrayHeader() if err != nil { return msgp.WrapError(err, "Record") } fe.EnsureCapacity(int(entryLen)) for i := 0; i < int(entryLen); i++ { lr := fe.AppendEmpty() err = parseEntryToLogRecord(dc, lr) if err != nil { return msgp.WrapError(err, "Entries", i) } lr.Attributes().PutStr(tagAttributeKey, tag) } if arrLen == 3 { fe.OptionsMap, err = parseOptions(dc) return err } return nil } func parseEntryToLogRecord(dc *msgp.Reader, lr plog.LogRecord) error { arrLen, err := dc.ReadArrayHeader() if err != nil { return err } if arrLen != 2 { return msgp.ArrayError{Wanted: 2, Got: arrLen} } return parseRecordToLogRecord(dc, lr) } type PackedForwardEventLogRecords struct { plog.LogRecordSlice OptionsMap } func (pfe *PackedForwardEventLogRecords) LogRecords() plog.LogRecordSlice { return pfe.LogRecordSlice } // DecodeMsg implements msgp.Decodable. This was originally code generated but // then manually copied here in order to handle the optional Options field. func (pfe *PackedForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) error { pfe.LogRecordSlice = plog.NewLogRecordSlice() arrLen, err := dc.ReadArrayHeader() if err != nil { return err } if arrLen < 2 || arrLen > 3 { return msgp.ArrayError{Wanted: 2, Got: arrLen} } tag, err := dc.ReadString() if err != nil { return msgp.WrapError(err, "Tag") } entriesFirstByte, err := dc.R.Peek(1) if err != nil { return msgp.WrapError(err, "EntriesRaw") } entriesType := msgp.NextType(entriesFirstByte) // We have to read out the entries raw all the way first because we don't // know whether it is compressed or not until we read the options map which // comes after. I guess we could use some kind of detection logic to // determine if it is gzipped by peeking and just ignoring options, but // this seems simpler for now. var entriesRaw []byte switch entriesType { case msgp.StrType: var entriesStr string entriesStr, err = dc.ReadString() if err != nil { return msgp.WrapError(err, "EntriesRaw") } entriesRaw = []byte(entriesStr) case msgp.BinType: entriesRaw, err = dc.ReadBytes(nil) if err != nil { return msgp.WrapError(err, "EntriesRaw") } default: return msgp.WrapError(fmt.Errorf("invalid type %d", entriesType), "EntriesRaw") } if arrLen == 3 { pfe.OptionsMap, err = parseOptions(dc) if err != nil { return err } } err = pfe.parseEntries(entriesRaw, pfe.Compressed() == "gzip", tag) if err != nil { return err } return nil } func (pfe *PackedForwardEventLogRecords) parseEntries(entriesRaw []byte, isGzipped bool, tag string) error { var reader io.Reader reader = bytes.NewReader(entriesRaw) if isGzipped { var err error reader, err = gzip.NewReader(reader) if err != nil { return err } defer reader.(*gzip.Reader).Close() } msgpReader := msgp.NewReader(reader) // Allocate only once, since the MoveTo cleans the lr, so we can reuse. lr := plog.NewLogRecord() for { err := parseEntryToLogRecord(msgpReader, lr) if err != nil { if errors.Is(msgp.Cause(err), io.EOF) { return nil } return err } lr.Attributes().PutStr(tagAttributeKey, tag) lr.MoveTo(pfe.AppendEmpty()) } }