transform/otlp_logs_csv.go (153 lines of code) (raw):

package transform import ( "bytes" "encoding/csv" "sort" "strconv" "strings" "time" commonv1 "buf.build/gen/go/opentelemetry/opentelemetry/protocolbuffers/go/opentelemetry/proto/common/v1" "github.com/Azure/adx-mon/pkg/otlp" fflib "github.com/pquerna/ffjson/fflib/v1" ) var ( // Logs often contain unespcaped newlines, particularly at the end of a log line // but also in case of stacktraces. newlineReplacer = strings.NewReplacer("\r\n", "%0D%0A", "\n", "%0A") ) type CSVWriter struct { w *bytes.Buffer buf *strings.Builder enc *csv.Writer labelsBuf *bytes.Buffer seriesIdBuf *bytes.Buffer line []byte columns [][]byte fields []string } // NewCSVWriter returns a new CSVWriter that writes to the given buffer. The columns, if specified, are // label keys that will be promoted to columns. func NewCSVWriter(w *bytes.Buffer, columns []string) *CSVWriter { writer := &CSVWriter{ w: w, buf: &strings.Builder{}, seriesIdBuf: bytes.NewBuffer(make([]byte, 0, 1024)), labelsBuf: bytes.NewBuffer(make([]byte, 0, 1024)), enc: csv.NewWriter(w), line: make([]byte, 0, 4096), columns: make([][]byte, 0, len(columns)), fields: make([]string, 0, 4+len(columns)), } writer.InitColumns(columns) return writer } func (w *CSVWriter) MarshalLog(logs *otlp.Logs) error { // See ingestor/storage/schema::NewLogsSchema // we're writing a ExportLogsServiceRequest as a CSV // There are 9 fields defined in an OTLP log schema fields := make([]string, 0, 9) // Convert log records to CSV // see samples at https://opentelemetry.io/docs/specs/otel/protocol/file-exporter/#examples for _, l := range logs.Logs { // Reset fields fields = fields[:0] // Timestamp fields = append(fields, otlpTSToUTC(int64(l.GetTimeUnixNano()))) // ObservedTimestamp if v := l.GetObservedTimeUnixNano(); v > 0 { // Some clients don't set this value. fields = append(fields, otlpTSToUTC(int64(l.GetObservedTimeUnixNano()))) } else { fields = append(fields, time.Now().UTC().Format(time.RFC3339Nano)) } // TraceId fields = append(fields, string(l.GetTraceId())) // SpanId fields = append(fields, string(l.GetSpanId())) // SeverityText fields = append(fields, l.GetSeverityText()) // SeverityNumber fields = append(fields, l.GetSeverityNumber().String()) // Body buf := w.buf serializeAnyValue(buf, l.GetBody(), 0) fields = append(fields, buf.String()) // Resource buf.Reset() buf.WriteByte('{') for _, r := range logs.Resources { if buf.String()[buf.Len()-1] != '{' { buf.WriteByte(',') } fflib.WriteJson(buf, []byte(r.GetKey())) buf.WriteByte(':') fflib.WriteJson(buf, []byte(r.GetValue().GetStringValue())) } buf.WriteByte('}') fields = append(fields, buf.String()) // Attributes buf.Reset() buf.WriteByte('{') for _, a := range l.GetAttributes() { if buf.String()[buf.Len()-1] != '{' { buf.WriteByte(',') } fflib.WriteJson(buf, []byte(a.GetKey())) buf.WriteByte(':') fflib.WriteJson(buf, []byte(a.GetValue().GetStringValue())) } buf.WriteByte('}') fields = append(fields, buf.String()) // Serialize if err := w.enc.Write(fields); err != nil { return err } w.enc.Flush() } return w.enc.Error() } func (w *CSVWriter) Reset() { w.w.Reset() w.buf.Reset() } func (w *CSVWriter) Bytes() []byte { return w.w.Bytes() } // InitColumns initializes the labels that will be promoted to columns in the CSV file. This can be done // once on the *Writer and subsequent calls are no-ops. func (w *CSVWriter) InitColumns(columns []string) { if len(w.columns) > 0 { return } sortLower := make([][]byte, len(columns)) for i, v := range columns { sortLower[i] = []byte(strings.ToLower(v)) } sort.Slice(sortLower, func(i, j int) bool { return bytes.Compare(sortLower[i], sortLower[j]) < 0 }) w.columns = sortLower } const maxNestedDepth = 100 func serializeAnyValue(buf *strings.Builder, v *commonv1.AnyValue, depth int) { if depth == 0 { buf.Reset() } if depth > maxNestedDepth { buf.WriteString("...") return } switch v.GetValue().(type) { case *commonv1.AnyValue_StringValue: // In the case of unstructured text, the top-level Body object // is just a simple string, so there is no need to WriteJson if depth == 0 { buf.WriteString(v.GetStringValue()) return } fflib.WriteJson(buf, []byte(v.GetStringValue())) case *commonv1.AnyValue_BoolValue: fflib.WriteJson(buf, []byte(strconv.FormatBool(v.GetBoolValue()))) case *commonv1.AnyValue_IntValue: fflib.WriteJson(buf, []byte(strconv.FormatInt(v.GetIntValue(), 10))) case *commonv1.AnyValue_DoubleValue: fflib.WriteJson(buf, []byte(strconv.FormatFloat(v.GetDoubleValue(), 'f', -1, 64))) case *commonv1.AnyValue_KvlistValue: buf.WriteByte('{') for i, kv := range v.GetKvlistValue().GetValues() { if i != 0 { buf.WriteByte(',') } fflib.WriteJson(buf, []byte(kv.GetKey())) buf.WriteByte(':') serializeAnyValue(buf, kv.GetValue(), depth+1) } buf.WriteByte('}') case *commonv1.AnyValue_ArrayValue: buf.WriteByte('[') for i, v := range v.GetArrayValue().GetValues() { if i != 0 { buf.WriteByte(',') } serializeAnyValue(buf, v, depth+1) } buf.WriteByte(']') default: fflib.WriteJson(buf, []byte(v.GetStringValue())) } }