collector/export/metric_otlp.go (233 lines of code) (raw):
package export
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"
v1 "buf.build/gen/go/opentelemetry/opentelemetry/protocolbuffers/go/opentelemetry/proto/collector/metrics/v1"
commonv1 "buf.build/gen/go/opentelemetry/opentelemetry/protocolbuffers/go/opentelemetry/proto/common/v1"
metricsv1 "buf.build/gen/go/opentelemetry/opentelemetry/protocolbuffers/go/opentelemetry/proto/metrics/v1"
resourcev1 "buf.build/gen/go/opentelemetry/opentelemetry/protocolbuffers/go/opentelemetry/proto/resource/v1"
"github.com/Azure/adx-mon/metrics"
adxhttp "github.com/Azure/adx-mon/pkg/http"
"github.com/Azure/adx-mon/pkg/pool"
"github.com/Azure/adx-mon/pkg/prompb"
"github.com/Azure/adx-mon/transform"
"github.com/golang/protobuf/proto"
)
var (
nameLabel = []byte("__name__")
)
// PromToOtlpExporter clones Prometheus WriteRequests to an OTLP/HTTP endpoint
// Implements RemoteWriteClient
type PromToOtlpExporter struct {
httpClient *http.Client
destination string
transformer *transform.RequestTransformer
resourceAttributes []*commonv1.KeyValue
stringKVPool *pool.Generic
datapointPool *pool.Generic
}
type PromToOtlpExporterOpts struct {
Transformer *transform.RequestTransformer
Destination string
AddResourceAttributes map[string]string
// Close controls whether the client closes the connection after each request.
Close bool
// Timeout is the timeout for the http client and the http request.
Timeout time.Duration
// InsecureSkipVerify controls whether the client verifies the server's certificate chain and host name.
InsecureSkipVerify bool
// IdleConnTimeout is the maximum amount of time an idle (keep-alive) connection
// will remain idle before closing itself.
IdleConnTimeout time.Duration
// ResponseHeaderTimeout is the amount of time to wait for a server's response headers
// after fully writing the request (including its body, if any).
ResponseHeaderTimeout time.Duration
// MaxIdleConns controls the maximum number of idle (keep-alive) connections across all hosts.
MaxIdleConns int
// MaxIdleConnsPerHost, if non-zero, controls the maximum idle (keep-alive) per host.
MaxIdleConnsPerHost int
// MaxConnsPerHost, if non-zero, controls the maximum connections per host.
MaxConnsPerHost int
// TLSHandshakeTimeout specifies the maximum amount of time to
// wait for a TLS handshake. Zero means no timeout.
TLSHandshakeTimeout time.Duration
// DisableHTTP2 controls whether the client disables HTTP/2 support.
DisableHTTP2 bool
// DisableKeepAlives controls whether the client disables HTTP keep-alives.
DisableKeepAlives bool
}
func (c PromToOtlpExporterOpts) WithDefaults() PromToOtlpExporterOpts {
if c.Timeout == 0 {
c.Timeout = 10 * time.Second
}
if c.IdleConnTimeout == 0 {
c.IdleConnTimeout = 1 * time.Minute
}
if c.ResponseHeaderTimeout == 0 {
c.ResponseHeaderTimeout = 10 * time.Second
}
if c.MaxIdleConns == 0 {
c.MaxIdleConns = 100
}
if c.MaxIdleConnsPerHost == 0 {
c.MaxIdleConnsPerHost = 5
}
if c.MaxConnsPerHost == 0 {
c.MaxConnsPerHost = 5
}
if c.TLSHandshakeTimeout == 0 {
c.TLSHandshakeTimeout = 10 * time.Second
}
return c
}
func NewPromToOtlpExporter(opts PromToOtlpExporterOpts) *PromToOtlpExporter {
opts = opts.WithDefaults()
client := adxhttp.NewClient(
adxhttp.ClientOpts{
Timeout: opts.Timeout,
InsecureSkipVerify: opts.InsecureSkipVerify,
Close: opts.Close,
MaxIdleConnsPerHost: opts.MaxIdleConnsPerHost,
MaxIdleConns: opts.MaxIdleConns,
IdleConnTimeout: opts.IdleConnTimeout,
ResponseHeaderTimeout: opts.ResponseHeaderTimeout,
DisableHTTP2: opts.DisableHTTP2,
DisableKeepAlives: opts.DisableKeepAlives,
},
)
var attributes []*commonv1.KeyValue
if opts.AddResourceAttributes != nil {
attributes = make([]*commonv1.KeyValue, 0, len(opts.AddResourceAttributes))
for k, v := range opts.AddResourceAttributes {
attributes = append(attributes, &commonv1.KeyValue{
Key: k,
Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: v}},
})
}
}
return &PromToOtlpExporter{
httpClient: client,
destination: opts.Destination,
transformer: opts.Transformer,
resourceAttributes: attributes,
stringKVPool: pool.NewGeneric(4096, func(sz int) interface{} {
return &commonv1.KeyValue{
Key: "",
Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: ""}},
}
}),
datapointPool: pool.NewGeneric(4096, func(sz int) interface{} {
return &metricsv1.NumberDataPoint{
Attributes: nil,
TimeUnixNano: 0,
Value: &metricsv1.NumberDataPoint_AsDouble{AsDouble: 0},
}
}),
}
}
// Write sends the given WriteRequest to the given OTLP endpoint
func (c *PromToOtlpExporter) Write(ctx context.Context, wr *prompb.WriteRequest) error {
serialized, timeseriesCount, err := c.promToOtlpRequest(wr)
if err != nil {
return fmt.Errorf("metric otlp forwarder convert: %w", err)
}
if timeseriesCount == 0 {
return nil
}
return c.sendRequest(serialized, timeseriesCount)
}
func (c *PromToOtlpExporter) CloseIdleConnections() {
c.httpClient.CloseIdleConnections()
}
func (c *PromToOtlpExporter) promToOtlpRequest(wr *prompb.WriteRequest) ([]byte, int64, error) {
scopeMetrics := &metricsv1.ScopeMetrics{
// TODO - assume we filter most or keep most?
Metrics: make([]*metricsv1.Metric, 0, len(wr.Timeseries)),
}
exportRequest := &v1.ExportMetricsServiceRequest{
ResourceMetrics: []*metricsv1.ResourceMetrics{
{
Resource: &resourcev1.Resource{
Attributes: c.resourceAttributes,
},
ScopeMetrics: []*metricsv1.ScopeMetrics{
scopeMetrics,
},
},
},
}
count := int64(0)
for _, ts := range wr.Timeseries {
nameBytes := prompb.MetricName(ts)
if c.transformer.ShouldDropMetric(ts, nameBytes) {
continue
}
count++
gauge := &metricsv1.Gauge{
DataPoints: make([]*metricsv1.NumberDataPoint, 0, len(ts.Samples)),
}
metric := &metricsv1.Metric{
Data: &metricsv1.Metric_Gauge{
Gauge: gauge,
},
}
attributes := make([]*commonv1.KeyValue, 0, len(ts.Labels))
c.transformer.WalkLabels(ts, func(k, v []byte) {
// skip adding the name label and any adxmon_ prefixed labels
if bytes.Equal(k, nameLabel) || bytes.HasPrefix(k, []byte("adxmon_")) {
return
}
attribute := c.stringKVPool.Get(0).(*commonv1.KeyValue)
// Explicitly set all fields to reset, but also to avoid allocations for the value
attribute.Key = string(k)
// only accept stringval here to avoid allocations
stringval := attribute.Value.Value.(*commonv1.AnyValue_StringValue)
stringval.StringValue = string(v)
attributes = append(attributes, attribute)
})
metric.Name = string(nameBytes)
for _, sample := range ts.Samples {
datapoint := c.datapointPool.Get(0).(*metricsv1.NumberDataPoint)
// Explicitly set all fields to reset, but also to avoid allocations for the value
datapoint.Attributes = attributes
datapoint.TimeUnixNano = uint64(sample.Timestamp * 1000000) // milliseconds to nanoseconds
datapoint.StartTimeUnixNano = 0
datapoint.Exemplars = nil
datapoint.Flags = 0
// only accept doubleval here to avoid allocations
doubleVal := datapoint.Value.(*metricsv1.NumberDataPoint_AsDouble)
doubleVal.AsDouble = sample.Value
gauge.DataPoints = append(gauge.DataPoints, datapoint)
}
scopeMetrics.Metrics = append(scopeMetrics.Metrics, metric)
}
serialized, err := proto.Marshal(exportRequest)
for _, scopeMetric := range scopeMetrics.Metrics {
for idx, datapoint := range scopeMetric.GetGauge().GetDataPoints() {
if idx == 0 {
// shared attribute with all datapoints under metric
for _, attribute := range datapoint.Attributes {
c.stringKVPool.Put(attribute)
}
}
c.datapointPool.Put(datapoint)
}
}
return serialized, count, err
}
func (c *PromToOtlpExporter) sendRequest(body []byte, count int64) error {
req, err := http.NewRequest("POST", c.destination, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("metric otlp forwarder request: %w", err)
}
req.Header.Set("Content-Type", "application/x-protobuf")
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("metric otlp forwarder post: %w", err)
}
if resp.StatusCode == http.StatusOK {
var exportMetricsServiceResponse v1.ExportMetricsServiceResponse
var buf []byte
buf, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return fmt.Errorf("metric otlp forwarder read response: %w", err)
}
err = proto.Unmarshal(buf, &exportMetricsServiceResponse)
if err != nil {
return fmt.Errorf("metric otlp forwarder unmarshal response: %w", err)
}
if exportMetricsServiceResponse.HasPartialSuccess() {
rejected := exportMetricsServiceResponse.PartialSuccess.RejectedDataPoints
metrics.CollectorExporterFailed.WithLabelValues("PromToOtlp", c.destination).Add(float64(rejected))
metrics.CollectorExporterSent.WithLabelValues("PromToOtlp", c.destination).Add(float64(count - rejected))
} else {
metrics.CollectorExporterSent.WithLabelValues("PromToOtlp", c.destination).Add(float64(count))
}
} else {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
return fmt.Errorf("metric otlp forwarder post error code: %s", resp.Status)
}
return nil
}