transform/metrics_csv.go (160 lines of code) (raw):

package transform import ( "bytes" "encoding/csv" "errors" "sort" "strconv" "strings" "time" "github.com/Azure/adx-mon/pkg/prompb" "github.com/Azure/adx-mon/schema" "github.com/cespare/xxhash" fflib "github.com/pquerna/ffjson/fflib/v1" adxcsv "github.com/Azure/adx-mon/pkg/csv" ) type MetricsCSVWriter struct { w *bytes.Buffer buf *strings.Builder enc *csv.Writer labelsBuf *bytes.Buffer seriesIdBuf *bytes.Buffer line []byte columns [][]byte lifted Fields headerWritten bool schemaHash uint64 schema schema.SchemaMapping } // NewMetricsCSVWriter returns a new CSVWriter that writes to the given buffer. The columns, if specified, are // label keys that will be promoted to columns. func NewMetricsCSVWriter(w *bytes.Buffer, lifted Fields) *MetricsCSVWriter { return NewMetricsCSVWriterWithSchema(w, lifted, schema.DefaultMetricsMapping) } // NewMetricsCSVWriter returns a new CSVWriter that writes to the given buffer. The columns, if specified, are // label keys that will be promoted to columns. func NewMetricsCSVWriterWithSchema(w *bytes.Buffer, lifted Fields, mapping schema.SchemaMapping) *MetricsCSVWriter { writer := &MetricsCSVWriter{ w: w, buf: &strings.Builder{}, seriesIdBuf: bytes.NewBuffer(make([]byte, 0, 1024)), labelsBuf: bytes.NewBuffer(make([]byte, 0, 1024)), enc: csv.NewWriter(w), line: make([]byte, 0, 4096), columns: make([][]byte, 0, len(lifted)), lifted: lifted, schemaHash: schema.SchemaHash(mapping), schema: mapping, } columns := make([]string, 0, len(lifted)) for _, v := range lifted { columns = append(columns, v.Source) } writer.InitColumns(columns) return writer } func (w *MetricsCSVWriter) MarshalCSV(ts *prompb.TimeSeries) error { if !w.headerWritten { line := w.line[:0] line = schema.AppendCSVHeader(line, w.schema) if n, err := w.w.Write(line); err != nil { return err } else if n != len(line) { return errors.New("short write") } w.headerWritten = true } buf := w.labelsBuf buf.Reset() seriesIdBuf := w.seriesIdBuf seriesIdBuf.Reset() var j int // Marshal the labels as JSON and avoid allocations since this code is in the hot path. buf.WriteByte('{') for _, v := range ts.Labels { w.seriesIdBuf.Write(v.Name) w.seriesIdBuf.Write(v.Value) // Drop the __name__ label since it is implied that the name of the CSV file is the name of the metric. if bytes.Equal(v.Name, []byte("__name__")) || bytes.HasPrefix(v.Name, []byte("adxmon_")) { continue } // We need to drop the labels that have been promoted to columns to avoid duplicating the storage of the value // in both the Labels column the lifted column. The columns are sorted so we can walk them in order and skip // any matches. var skip bool for j < len(w.columns) { cmp := prompb.CompareLower(w.columns[j], v.Name) // The lifted column is less than the current label, we need move to the next column and check again. if cmp < 0 { j++ continue } else if cmp == 0 { // The lifted column matches the current label, we need to skip it. j++ skip = true break } // The lifted column is greater than the current label, we can stop looking. break } if skip { continue } if buf.Bytes()[buf.Len()-1] != '{' { buf.WriteByte(',') } fflib.WriteJson(buf, v.Name) buf.WriteByte(':') fflib.WriteJson(buf, v.Value) } buf.WriteByte('}') seriesId := xxhash.Sum64(seriesIdBuf.Bytes()) for _, v := range ts.Samples { line := w.line[:0] // Timestamp line = time.Unix(v.Timestamp/1000, (v.Timestamp%1000)*int64(time.Millisecond)).UTC().AppendFormat(line, time.RFC3339Nano) // seriesID line = append(line, ',') line = strconv.AppendInt(line, int64(seriesId), 10) // labels line = adxcsv.AppendQuoted(line, buf.Bytes()) line = append(line, ',') // Values line = strconv.AppendFloat(line, v.Value, 'f', 9, 64) if len(w.columns) > 0 { var i, j int for i < len(ts.Labels) && j < len(w.columns) { cmp := prompb.CompareLower(ts.Labels[i].Name, w.columns[j]) if cmp == 0 { line = adxcsv.Append(line, ts.Labels[i].Value) j++ i++ } else if cmp > 0 { j++ line = append(line, ',') } else { i++ } } for j < len(w.columns) { line = append(line, ',') j++ } } // End of line line = adxcsv.AppendNewLine(line) if n, err := w.w.Write(line); err != nil { return err } else if n != len(line) { return errors.New("short write") } } return nil } func (w *MetricsCSVWriter) Reset() { w.w.Reset() w.buf.Reset() w.headerWritten = false } func (w *MetricsCSVWriter) Bytes() []byte { return w.w.Bytes() } // InitColumns initializes the labels that will be promoted to columns in the CSV file. This can be done // once on the *Writer and subsequent calls are no-ops. func (w *MetricsCSVWriter) InitColumns(columns []string) { if len(w.columns) > 0 { return } sortLower := make([][]byte, len(columns)) for i, v := range columns { sortLower[i] = []byte(strings.ToLower(v)) } sort.Slice(sortLower, func(i, j int) bool { return bytes.Compare(sortLower[i], sortLower[j]) < 0 }) w.columns = sortLower } func (w *MetricsCSVWriter) SchemaHash() uint64 { return w.schemaHash }