in transform/native_logs_csv.go [69:226]
func (w *NativeLogsCSVWriter) MarshalNativeLog(log *types.Log) error {
if !w.headerWritten {
line := w.line[:0]
line = schema.AppendCSVHeader(line, w.schema)
if n, err := w.w.Write(line); err != nil {
return err
} else if n != len(line) {
return errors.New("short write")
}
w.headerWritten = true
}
// There are 9 fields defined in an OTLP log schema
fields := make([]string, 0, 9)
// Convert log records to CSV
// see samples at https://opentelemetry.io/docs/specs/otel/protocol/file-exporter/#examples
// Reset fields
fields = fields[:0]
// Timestamp
fields = append(fields, otlpTSToUTC(int64(log.GetTimestamp())))
// ObservedTimestamp
if v := log.GetObservedTimestamp(); v > 0 {
// Some clients don't set this value.
fields = append(fields, otlpTSToUTC(int64(log.GetObservedTimestamp())))
} else {
fields = append(fields, time.Now().UTC().Format(time.RFC3339Nano))
}
// TraceId - we don't have this
fields = append(fields, "")
// SpanId - we don't have this
fields = append(fields, "")
// SeverityText - we don't have this
fields = append(fields, "")
// SeverityNumber - we don't have this
fields = append(fields, "")
// Body
buf := w.buf
buf.Reset()
buf.WriteByte('{')
hasPrevField := false
err := log.ForEachBody(func(k string, v any) error {
val, err := ffjson.Marshal(v)
if err != nil {
return nil // Just skip this one
}
if hasPrevField {
buf.WriteByte(',')
} else {
hasPrevField = true
}
fflib.WriteJson(buf, []byte(k))
buf.WriteByte(':')
buf.Write(val) // Already marshalled into json. Don't escape it again.
ffjson.Pool(val)
return nil
})
if err != nil {
return err
}
buf.WriteByte('}')
fields = append(fields, buf.String())
// Resource
buf.Reset()
buf.WriteByte('{')
hasPrevField = false
err = log.ForEachResource(func(k string, v any) error {
_, lifted := w.fieldLookup[k]
// These are added by collector and used internally. Don't store them in the final table.
if strings.HasPrefix(k, "adxmon_") || strings.HasPrefix(k, "label.") || strings.HasPrefix(k, "annotation.") || lifted {
return nil
}
val, err := ffjson.Marshal(v)
if err != nil {
return nil // Just skip this one
}
if hasPrevField {
buf.WriteByte(',')
} else {
hasPrevField = true
}
fflib.WriteJson(buf, []byte(k))
buf.WriteByte(':')
buf.Write(val) // Already marshalled into json. Don't escape it again.
ffjson.Pool(val)
return nil
})
if err != nil {
return err
}
buf.WriteByte('}')
fields = append(fields, buf.String())
// Attributes
buf.Reset()
buf.WriteByte('{')
hasPrevField = false
err = log.ForEachAttribute(func(k string, v any) error {
if strings.HasPrefix(k, "adxmon_") {
return nil
}
val, err := ffjson.Marshal(v)
if err != nil {
return nil // Just skip this one
}
if hasPrevField {
buf.WriteByte(',')
} else {
hasPrevField = true
}
fflib.WriteJson(buf, []byte(k))
buf.WriteByte(':')
buf.Write(val) // Already marshalled into json. Don't escape it again.
ffjson.Pool(val)
return nil
})
if err != nil {
return err
}
buf.WriteByte('}')
fields = append(fields, buf.String())
for _, v := range w.columns {
if val, ok := log.GetResourceValue(string(v)); ok {
if s, ok := val.(string); ok {
fields = append(fields, s)
} else {
// FIXME: see if we can convert the value to a string
fields = append(fields, "")
}
} else {
fields = append(fields, "")
}
}
// Serialize
if err := w.enc.Write(fields); err != nil {
return err
}
w.enc.Flush()
return w.enc.Error()
}