transform/transformer.go (171 lines of code) (raw):
package transform
import (
"bytes"
"regexp"
"sync"
"github.com/Azure/adx-mon/metrics"
"github.com/Azure/adx-mon/pkg/prompb"
)
type RequestTransformer struct {
// DefaultDropMetrics is a flag that indicates whether metrics should be dropped by default unless they match
// a keep rule.
DefaultDropMetrics bool
// KeepMetrics is a slice of regexes that keeps metrics when the metric name matches. A metric matching a
// Keep rule will not be dropped even if it matches a drop rule.
KeepMetrics []*regexp.Regexp
// KeepMetricsWithLabelValue is a map of regexes of label names to regexes of label values. When both match,
// the metric will be kept.
KeepMetricsWithLabelValue map[*regexp.Regexp]*regexp.Regexp
// DropLabels is a map of metric names regexes to label name regexes. When both match, the label will be dropped.
DropLabels map[*regexp.Regexp]*regexp.Regexp
// DropMetrics is a slice of regexes that drops metrics when the metric name matches. The metric name format
// should match the Prometheus naming style before the metric is translated to a Kusto table name.
DropMetrics []*regexp.Regexp
// AddLabels is a map of label names to label values that will be added to all metrics.
AddLabels map[string]string
addLabels []*prompb.Label
// AllowedDatabase is a map of database names that are allowed to be written to.
AllowedDatabase map[string]struct{}
initOnce sync.Once
}
func (f *RequestTransformer) init() {
f.initOnce.Do(func() {
addLabelsSlice := make([]*prompb.Label, 0, len(f.AddLabels))
if f.DropLabels == nil {
f.DropLabels = make(map[*regexp.Regexp]*regexp.Regexp)
}
if f.KeepMetricsWithLabelValue == nil {
f.KeepMetricsWithLabelValue = make(map[*regexp.Regexp]*regexp.Regexp)
}
for k, v := range f.AddLabels {
addLabelsSlice = append(addLabelsSlice, &prompb.Label{
Name: []byte(k),
Value: []byte(v),
})
}
prompb.Sort(addLabelsSlice)
f.addLabels = addLabelsSlice
})
}
func (f *RequestTransformer) TransformWriteRequest(req *prompb.WriteRequest) *prompb.WriteRequest {
f.init()
var i int
for j := range req.Timeseries {
v := req.Timeseries[j]
// First skip any metrics that should be dropped.
name := prompb.MetricName(v)
if f.ShouldDropMetric(v, name) {
metrics.MetricsDroppedTotal.WithLabelValues(string(name)).Add(float64(len(v.Samples)))
continue
}
if len(f.AllowedDatabase) > 0 {
var db []byte
for _, l := range v.Labels {
if bytes.Equal(l.Name, []byte("adxmon_database")) {
db = l.Value
break
}
}
if _, ok := f.AllowedDatabase[string(db)]; !ok {
metrics.MetricsDroppedTotal.WithLabelValues(string(name)).Add(float64(len(v.Samples)))
continue
}
}
req.Timeseries[i] = f.TransformTimeSeries(v)
i++
}
req.Timeseries = req.Timeseries[:i]
return req
}
func (f *RequestTransformer) TransformTimeSeries(v *prompb.TimeSeries) *prompb.TimeSeries {
f.init()
// If labels are configured to be dropped, filter them next.
var (
i int
skipLabel bool
)
for j, l := range v.Labels {
// Never attempt to drop __name__ label as this is required to identify the metric.
if bytes.Equal(l.Name, []byte("__name__")) {
v.Labels[i] = v.Labels[j]
i++
continue
}
// To drop a label, it has to match the metrics regex and the label regex.
skipLabel = false
for metrReg, labelReg := range f.DropLabels {
if metrReg.Match(v.Labels[0].Value) && labelReg.Match(l.Name) {
skipLabel = true
break
}
}
if skipLabel {
continue
}
// Skip any labels that will be overwritten by the add labels.
for _, al := range f.addLabels {
if bytes.Equal(l.Name, al.Name) {
skipLabel = true
break
}
}
if skipLabel {
continue
}
v.Labels[i] = v.Labels[j]
i++
}
v.Labels = v.Labels[:i]
for _, ll := range f.addLabels {
v.AppendLabel(ll.Name, ll.Value)
}
prompb.Sort(v.Labels)
return v
}
// WalkLabels operates similarly to TransformTimeSeries, but instead of modifying the TimeSeries, it calls the callback with the key and value
// This is safe to call in parallel if the name and value bytes are not modified by the callback.
func (f *RequestTransformer) WalkLabels(v *prompb.TimeSeries, callback func(name []byte, value []byte)) {
f.init()
var skipLabel bool
for _, l := range v.Labels {
// Never attempt to drop __name__ label as this is required to identify the metric.
if bytes.Equal(l.Name, []byte("__name__")) {
callback(l.Name, l.Value)
continue
}
// To drop a label, it has to match the metrics regex and the label regex.
skipLabel = false
for metrReg, labelReg := range f.DropLabels {
if metrReg.Match(v.Labels[0].Value) && labelReg.Match(l.Name) {
skipLabel = true
break
}
}
if skipLabel {
continue
}
// Skip any labels that will be overwritten by the add labels.
for _, al := range f.addLabels {
if bytes.Equal(l.Name, al.Name) {
skipLabel = true
break
}
}
if skipLabel {
continue
}
callback(l.Name, l.Value)
}
for _, ll := range f.addLabels {
callback(ll.Name, ll.Value)
}
}
func (f *RequestTransformer) ShouldDropMetric(v *prompb.TimeSeries, name []byte) bool {
if f.DefaultDropMetrics {
// Explicitly dropped metrics take precedence over explicitly kept metrics.
for _, r := range f.DropMetrics {
if r.Match(name) {
return true
}
}
for _, r := range f.KeepMetrics {
if r.Match(name) {
return false
}
}
if len(f.KeepMetricsWithLabelValue) > 0 {
for _, label := range v.Labels {
// Keep metrics that have a certain label
for lableRe, valueRe := range f.KeepMetricsWithLabelValue {
if lableRe.Match(label.Name) && valueRe.Match(label.Value) {
return false
}
}
}
}
return true
}
for _, r := range f.DropMetrics {
if r.Match(name) {
return true
}
}
return false
}