in transform/metrics_csv.go [67:184]
func (w *MetricsCSVWriter) MarshalCSV(ts *prompb.TimeSeries) error {
if !w.headerWritten {
line := w.line[:0]
line = schema.AppendCSVHeader(line, w.schema)
if n, err := w.w.Write(line); err != nil {
return err
} else if n != len(line) {
return errors.New("short write")
}
w.headerWritten = true
}
buf := w.labelsBuf
buf.Reset()
seriesIdBuf := w.seriesIdBuf
seriesIdBuf.Reset()
var j int
// Marshal the labels as JSON and avoid allocations since this code is in the hot path.
buf.WriteByte('{')
for _, v := range ts.Labels {
w.seriesIdBuf.Write(v.Name)
w.seriesIdBuf.Write(v.Value)
// Drop the __name__ label since it is implied that the name of the CSV file is the name of the metric.
if bytes.Equal(v.Name, []byte("__name__")) || bytes.HasPrefix(v.Name, []byte("adxmon_")) {
continue
}
// We need to drop the labels that have been promoted to columns to avoid duplicating the storage of the value
// in both the Labels column the lifted column. The columns are sorted so we can walk them in order and skip
// any matches.
var skip bool
for j < len(w.columns) {
cmp := prompb.CompareLower(w.columns[j], v.Name)
// The lifted column is less than the current label, we need move to the next column and check again.
if cmp < 0 {
j++
continue
} else if cmp == 0 {
// The lifted column matches the current label, we need to skip it.
j++
skip = true
break
}
// The lifted column is greater than the current label, we can stop looking.
break
}
if skip {
continue
}
if buf.Bytes()[buf.Len()-1] != '{' {
buf.WriteByte(',')
}
fflib.WriteJson(buf, v.Name)
buf.WriteByte(':')
fflib.WriteJson(buf, v.Value)
}
buf.WriteByte('}')
seriesId := xxhash.Sum64(seriesIdBuf.Bytes())
for _, v := range ts.Samples {
line := w.line[:0]
// Timestamp
line = time.Unix(v.Timestamp/1000, (v.Timestamp%1000)*int64(time.Millisecond)).UTC().AppendFormat(line, time.RFC3339Nano)
// seriesID
line = append(line, ',')
line = strconv.AppendInt(line, int64(seriesId), 10)
// labels
line = adxcsv.AppendQuoted(line, buf.Bytes())
line = append(line, ',')
// Values
line = strconv.AppendFloat(line, v.Value, 'f', 9, 64)
if len(w.columns) > 0 {
var i, j int
for i < len(ts.Labels) && j < len(w.columns) {
cmp := prompb.CompareLower(ts.Labels[i].Name, w.columns[j])
if cmp == 0 {
line = adxcsv.Append(line, ts.Labels[i].Value)
j++
i++
} else if cmp > 0 {
j++
line = append(line, ',')
} else {
i++
}
}
for j < len(w.columns) {
line = append(line, ',')
j++
}
}
// End of line
line = adxcsv.AppendNewLine(line)
if n, err := w.w.Write(line); err != nil {
return err
} else if n != len(line) {
return errors.New("short write")
}
}
return nil
}