collector/export/log_fluent.go (208 lines of code) (raw):

package export import ( "context" "encoding/binary" "fmt" "net" "slices" "strings" "sync" "time" "github.com/Azure/adx-mon/collector/logs/types" "github.com/Azure/adx-mon/metrics" "github.com/tinylib/msgp/msgp" ) type LogToFluentExporterOpts struct { // Destination is the fluent protocol destination. // It is a string of the form "tcp://host:port" or "unix:///path/to/socket" Destination string // TagAttribute is the attribute key to extract the fluent tag from. // If the attribute within a log is not set, the log will be ignored. TagAttribute string } // LogToFluentExporter exports logs using the fluent-forward protocol type LogToFluentExporter struct { destinationNetwork string destination string encoderPool *sync.Pool conn net.Conn } func NewLogToFluentExporter(opts LogToFluentExporterOpts) (*LogToFluentExporter, error) { var destination string var destinationNetwork string if strings.HasPrefix(opts.Destination, "unix://") { destination = strings.TrimPrefix(opts.Destination, "unix://") destinationNetwork = "unix" } else if strings.HasPrefix(opts.Destination, "tcp://") { destination = strings.TrimPrefix(opts.Destination, "tcp://") destinationNetwork = "tcp" } else { return nil, fmt.Errorf("invalid destination %s: must be in the form tcp://<host>:<port> or unix:///path/to/socket", opts.Destination) } if opts.TagAttribute == "" { return nil, fmt.Errorf("TagAttribute must be set") } return &LogToFluentExporter{ destinationNetwork: destinationNetwork, destination: destination, encoderPool: &sync.Pool{ New: func() interface{} { return newFluentEncoder(opts.TagAttribute) }, }, }, nil } func (e *LogToFluentExporter) Open(ctx context.Context) error { return nil } func (e *LogToFluentExporter) Close() error { return nil } func (e *LogToFluentExporter) Send(ctx context.Context, batch *types.LogBatch) error { encoder := e.encoderPool.Get().(*fluentEncoder) defer e.encoderPool.Put(encoder) data, err := encoder.encode(batch) if err != nil { return err } if len(data) == 0 { return nil // No logs to send } conn, err := e.dial(ctx) if err != nil { return fmt.Errorf("failed to connect to %s: %w", e.destination, err) } defer conn.Close() // Set write deadline if deadline, ok := ctx.Deadline(); ok { conn.SetWriteDeadline(deadline) } else { conn.SetWriteDeadline(time.Now().Add(30 * time.Second)) } // Write the data _, err = conn.Write(data) if err != nil { metrics.CollectorExporterFailed.WithLabelValues(e.Name(), e.destination).Add(float64(len(batch.Logs))) return fmt.Errorf("failed to write to %s: %w", e.destination, err) } metrics.CollectorExporterSent.WithLabelValues(e.Name(), e.destination).Add(float64(len(batch.Logs))) return nil } func (e *LogToFluentExporter) dial(ctx context.Context) (net.Conn, error) { var d net.Dialer d.Timeout = 10 * time.Second // Adjust timeout as needed return d.DialContext(ctx, e.destinationNetwork, e.destination) } func (e *LogToFluentExporter) Name() string { return "LogToFluentExporter" } // fluentEncoder encodes logs in the fluentd forward format type fluentEncoder struct { fluentExtTime *fluentExtTime w []byte batchToSend []*types.Log tagAttribute string } func newFluentEncoder(tagAttribute string) *fluentEncoder { return &fluentEncoder{ fluentExtTime: &fluentExtTime{}, tagAttribute: tagAttribute, } } // encode encodes a log batch into the fluentd forward format. // Not multi-goroutine safe. Returned bytes are only valid until the next call. func (e *fluentEncoder) encode(batch *types.LogBatch) ([]byte, error) { // Copy logbatch elements into new slice so we can sort it without modifying the original, // which is shared amongst other outputs. e.batchToSend = e.batchToSend[:0] for _, log := range batch.Logs { e.addBatchToSend(log) } if len(e.batchToSend) == 0 { return nil, nil } // Sort based on tags. This allows us to create a message per tag. slices.SortFunc(e.batchToSend, func(a, b *types.Log) int { tagOne := types.StringOrEmpty(a.GetAttributeValue(e.tagAttribute)) tagTwo := types.StringOrEmpty(b.GetAttributeValue(e.tagAttribute)) return strings.Compare(tagOne, tagTwo) }) e.w = e.w[:0] var err error activeTag := "" activeTagStart := 0 for idx, log := range e.batchToSend { currTag := types.StringOrEmpty(log.GetAttributeValue(e.tagAttribute)) if idx == 0 { activeTag = currTag } if idx == len(e.batchToSend)-1 { err = e.appendMsg(e.batchToSend[activeTagStart:], activeTag) if err != nil { return nil, err // bail - message is now invalid. } return e.w, nil } peekTag := types.StringOrEmpty(e.batchToSend[idx+1].GetAttributeValue(e.tagAttribute)) if peekTag != activeTag { err = e.appendMsg(e.batchToSend[activeTagStart:idx+1], activeTag) if err != nil { return nil, err // bail - message is now invalid. } activeTag = peekTag activeTagStart = idx + 1 } } return e.w, nil } func (e *fluentEncoder) addBatchToSend(log *types.Log) { attr := types.StringOrEmpty(log.GetAttributeValue(e.tagAttribute)) if attr == "" { return } e.batchToSend = append(e.batchToSend, log) } func (e *fluentEncoder) appendMsg(batchToSend []*types.Log, tag string) error { // Write Forward mode. See https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#forward-mode // We use the []byte oriented API instead of the writer-oriented API from msgp. // the Writer oriented one uses unsafe to slightly speed things up, but we have no reason to use this. // Additionally, the writer api does its own buffering and seems overly complicated for what we need. var err error e.w = msgp.AppendArrayHeader(e.w, 2) // [<tag>, <entries>] e.w = msgp.AppendString(e.w, tag) // <tag> e.w = msgp.AppendArrayHeader(e.w, uint32(len(batchToSend))) // <entries> for _, log := range batchToSend { e.w = msgp.AppendArrayHeader(e.w, 2) // [<time>, <record>] e.fluentExtTime.UnixTsNano = log.GetTimestamp() e.w, err = msgp.AppendExtension(e.w, e.fluentExtTime) // <time> if err != nil { return err // need to bail, message is now invalid. } e.w = msgp.AppendMapHeader(e.w, uint32(log.BodyLen())) // <record> err = log.ForEachBody(e.AppendMapElement) if err != nil { return err // need to bail, message is now invalid. } } return nil } func (e *fluentEncoder) AppendMapElement(key string, val any) error { var err error e.w = msgp.AppendString(e.w, key) e.w, err = msgp.AppendIntf(e.w, val) if err != nil { return err } return nil } // fluentExtTime implements msgp.Extension for the fluent extension timestamp. // https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format type fluentExtTime struct { UnixTsNano uint64 } func (e *fluentExtTime) ExtensionType() int8 { return 0 } func (e *fluentExtTime) Len() int { return 8 } func (e *fluentExtTime) MarshalBinaryTo(b []byte) error { if len(b) < 8 { return fmt.Errorf("buffer too small") } seconds := uint32(e.UnixTsNano / 1e9) nanoSeconds := uint32(e.UnixTsNano % 1e9) binary.BigEndian.PutUint32(b, uint32(seconds)) binary.BigEndian.PutUint32(b[4:], uint32(nanoSeconds)) return nil } func (e *fluentExtTime) UnmarshalBinary(b []byte) error { if len(b) < 8 { return fmt.Errorf("buffer too small") } seconds := binary.BigEndian.Uint32(b) nanoSeconds := binary.BigEndian.Uint32(b[4:]) e.UnixTsNano = uint64(seconds)*1e9 + uint64(nanoSeconds) return nil }