transform/native_logs_csv.go (200 lines of code) (raw):

package transform import ( "bytes" "encoding/csv" "errors" "sort" "strings" "time" "github.com/Azure/adx-mon/collector/logs/types" "github.com/Azure/adx-mon/schema" "github.com/pquerna/ffjson/ffjson" fflib "github.com/pquerna/ffjson/fflib/v1" ) type NativeLogsCSVWriter struct { w *bytes.Buffer buf *strings.Builder enc *csv.Writer labelsBuf *bytes.Buffer seriesIdBuf *bytes.Buffer line []byte columns [][]byte fields []string fieldLookup map[string]struct{} headerWritten bool schema schema.SchemaMapping schemaHash uint64 } // NewCSVNativeLogsCSVWriter returns a new CSVWriter that writes to the given buffer. The columns, if specified, are // label keys that will be promoted to columns. func NewCSVNativeLogsCSVWriter(w *bytes.Buffer, columns []string) *NativeLogsCSVWriter { return NewCSVNativeLogsCSVWriterWithSchema(w, columns, schema.DefaultLogsMapping) } // NewCSVNativeLogsCSVWriterWithSchema returns a new CSVWriter that writes to the given buffer. The columns, if specified, are // label keys that will be promoted to columns. func NewCSVNativeLogsCSVWriterWithSchema(w *bytes.Buffer, columns []string, mapping schema.SchemaMapping) *NativeLogsCSVWriter { writer := &NativeLogsCSVWriter{ w: w, buf: &strings.Builder{}, seriesIdBuf: bytes.NewBuffer(make([]byte, 0, 1024)), labelsBuf: bytes.NewBuffer(make([]byte, 0, 1024)), enc: csv.NewWriter(w), line: make([]byte, 0, 4096), columns: make([][]byte, 0, len(columns)), fields: make([]string, 0, 4+len(columns)), schemaHash: schema.SchemaHash(mapping), schema: mapping, fieldLookup: make(map[string]struct{}, len(columns)), } writer.InitColumns(columns) return writer } func otlpTSToUTC(ts int64) string { // check for nanosecond precision if ts&0x1fffffffffffff == ts { return time.Unix(ts/1000, (ts%1000)*int64(time.Millisecond)).UTC().Format(time.RFC3339Nano) } return time.Unix(0, ts).UTC().Format(time.RFC3339Nano) } func (w *NativeLogsCSVWriter) MarshalNativeLog(log *types.Log) error { if !w.headerWritten { line := w.line[:0] line = schema.AppendCSVHeader(line, w.schema) if n, err := w.w.Write(line); err != nil { return err } else if n != len(line) { return errors.New("short write") } w.headerWritten = true } // There are 9 fields defined in an OTLP log schema fields := make([]string, 0, 9) // Convert log records to CSV // see samples at https://opentelemetry.io/docs/specs/otel/protocol/file-exporter/#examples // Reset fields fields = fields[:0] // Timestamp fields = append(fields, otlpTSToUTC(int64(log.GetTimestamp()))) // ObservedTimestamp if v := log.GetObservedTimestamp(); v > 0 { // Some clients don't set this value. fields = append(fields, otlpTSToUTC(int64(log.GetObservedTimestamp()))) } else { fields = append(fields, time.Now().UTC().Format(time.RFC3339Nano)) } // TraceId - we don't have this fields = append(fields, "") // SpanId - we don't have this fields = append(fields, "") // SeverityText - we don't have this fields = append(fields, "") // SeverityNumber - we don't have this fields = append(fields, "") // Body buf := w.buf buf.Reset() buf.WriteByte('{') hasPrevField := false err := log.ForEachBody(func(k string, v any) error { val, err := ffjson.Marshal(v) if err != nil { return nil // Just skip this one } if hasPrevField { buf.WriteByte(',') } else { hasPrevField = true } fflib.WriteJson(buf, []byte(k)) buf.WriteByte(':') buf.Write(val) // Already marshalled into json. Don't escape it again. ffjson.Pool(val) return nil }) if err != nil { return err } buf.WriteByte('}') fields = append(fields, buf.String()) // Resource buf.Reset() buf.WriteByte('{') hasPrevField = false err = log.ForEachResource(func(k string, v any) error { _, lifted := w.fieldLookup[k] // These are added by collector and used internally. Don't store them in the final table. if strings.HasPrefix(k, "adxmon_") || strings.HasPrefix(k, "label.") || strings.HasPrefix(k, "annotation.") || lifted { return nil } val, err := ffjson.Marshal(v) if err != nil { return nil // Just skip this one } if hasPrevField { buf.WriteByte(',') } else { hasPrevField = true } fflib.WriteJson(buf, []byte(k)) buf.WriteByte(':') buf.Write(val) // Already marshalled into json. Don't escape it again. ffjson.Pool(val) return nil }) if err != nil { return err } buf.WriteByte('}') fields = append(fields, buf.String()) // Attributes buf.Reset() buf.WriteByte('{') hasPrevField = false err = log.ForEachAttribute(func(k string, v any) error { if strings.HasPrefix(k, "adxmon_") { return nil } val, err := ffjson.Marshal(v) if err != nil { return nil // Just skip this one } if hasPrevField { buf.WriteByte(',') } else { hasPrevField = true } fflib.WriteJson(buf, []byte(k)) buf.WriteByte(':') buf.Write(val) // Already marshalled into json. Don't escape it again. ffjson.Pool(val) return nil }) if err != nil { return err } buf.WriteByte('}') fields = append(fields, buf.String()) for _, v := range w.columns { if val, ok := log.GetResourceValue(string(v)); ok { if s, ok := val.(string); ok { fields = append(fields, s) } else { // FIXME: see if we can convert the value to a string fields = append(fields, "") } } else { fields = append(fields, "") } } // Serialize if err := w.enc.Write(fields); err != nil { return err } w.enc.Flush() return w.enc.Error() } func (w *NativeLogsCSVWriter) Reset() { w.w.Reset() w.buf.Reset() w.headerWritten = false } func (w *NativeLogsCSVWriter) Bytes() []byte { return w.w.Bytes() } // InitColumns initializes the labels that will be promoted to columns in the CSV file. This can be done // once on the *Writer and subsequent calls are no-ops. func (w *NativeLogsCSVWriter) InitColumns(columns []string) { if len(w.columns) > 0 { return } sortLower := make([][]byte, len(columns)) for i, v := range columns { sortLower[i] = []byte(strings.ToLower(v)) } sort.Slice(sortLower, func(i, j int) bool { return bytes.Compare(sortLower[i], sortLower[j]) < 0 }) w.columns = sortLower for _, v := range w.columns { w.fieldLookup[string(v)] = struct{}{} } } func (w *NativeLogsCSVWriter) SchemaHash() uint64 { return w.schemaHash }