func()

in transform/native_logs_csv.go [69:226]


func (w *NativeLogsCSVWriter) MarshalNativeLog(log *types.Log) error {
	if !w.headerWritten {
		line := w.line[:0]
		line = schema.AppendCSVHeader(line, w.schema)

		if n, err := w.w.Write(line); err != nil {
			return err
		} else if n != len(line) {
			return errors.New("short write")
		}
		w.headerWritten = true
	}

	// There are 9 fields defined in an OTLP log schema
	fields := make([]string, 0, 9)
	// Convert log records to CSV
	// see samples at https://opentelemetry.io/docs/specs/otel/protocol/file-exporter/#examples
	// Reset fields
	fields = fields[:0]
	// Timestamp
	fields = append(fields, otlpTSToUTC(int64(log.GetTimestamp())))
	// ObservedTimestamp
	if v := log.GetObservedTimestamp(); v > 0 {
		// Some clients don't set this value.
		fields = append(fields, otlpTSToUTC(int64(log.GetObservedTimestamp())))
	} else {
		fields = append(fields, time.Now().UTC().Format(time.RFC3339Nano))
	}
	// TraceId - we don't have this
	fields = append(fields, "")
	// SpanId - we don't have this
	fields = append(fields, "")
	// SeverityText - we don't have this
	fields = append(fields, "")
	// SeverityNumber - we don't have this
	fields = append(fields, "")
	// Body
	buf := w.buf
	buf.Reset()
	buf.WriteByte('{')
	hasPrevField := false

	err := log.ForEachBody(func(k string, v any) error {
		val, err := ffjson.Marshal(v)
		if err != nil {
			return nil // Just skip this one
		}

		if hasPrevField {
			buf.WriteByte(',')
		} else {
			hasPrevField = true
		}
		fflib.WriteJson(buf, []byte(k))
		buf.WriteByte(':')
		buf.Write(val) // Already marshalled into json. Don't escape it again.
		ffjson.Pool(val)
		return nil
	})

	if err != nil {
		return err
	}

	buf.WriteByte('}')
	fields = append(fields, buf.String())

	// Resource
	buf.Reset()
	buf.WriteByte('{')
	hasPrevField = false
	err = log.ForEachResource(func(k string, v any) error {
		_, lifted := w.fieldLookup[k]

		// These are added by collector and used internally.  Don't store them in the final table.
		if strings.HasPrefix(k, "adxmon_") || strings.HasPrefix(k, "label.") || strings.HasPrefix(k, "annotation.") || lifted {
			return nil
		}

		val, err := ffjson.Marshal(v)
		if err != nil {
			return nil // Just skip this one
		}

		if hasPrevField {
			buf.WriteByte(',')
		} else {
			hasPrevField = true
		}
		fflib.WriteJson(buf, []byte(k))
		buf.WriteByte(':')
		buf.Write(val) // Already marshalled into json. Don't escape it again.
		ffjson.Pool(val)
		return nil
	})

	if err != nil {
		return err
	}

	buf.WriteByte('}')
	fields = append(fields, buf.String())

	// Attributes
	buf.Reset()
	buf.WriteByte('{')
	hasPrevField = false

	err = log.ForEachAttribute(func(k string, v any) error {
		if strings.HasPrefix(k, "adxmon_") {
			return nil
		}

		val, err := ffjson.Marshal(v)
		if err != nil {
			return nil // Just skip this one
		}

		if hasPrevField {
			buf.WriteByte(',')
		} else {
			hasPrevField = true
		}
		fflib.WriteJson(buf, []byte(k))
		buf.WriteByte(':')
		buf.Write(val) // Already marshalled into json. Don't escape it again.
		ffjson.Pool(val)
		return nil
	})

	if err != nil {
		return err
	}

	buf.WriteByte('}')
	fields = append(fields, buf.String())

	for _, v := range w.columns {
		if val, ok := log.GetResourceValue(string(v)); ok {
			if s, ok := val.(string); ok {
				fields = append(fields, s)
			} else {
				// FIXME: see if we can convert the value to a string
				fields = append(fields, "")
			}
		} else {
			fields = append(fields, "")
		}
	}

	// Serialize
	if err := w.enc.Write(fields); err != nil {
		return err
	}

	w.enc.Flush()
	return w.enc.Error()
}