collector/otlp/logs_transfer.go (265 lines of code) (raw):
package otlp
import (
"context"
"errors"
"io"
"log/slog"
"net/http"
"strconv"
"sync"
v1 "buf.build/gen/go/opentelemetry/opentelemetry/protocolbuffers/go/opentelemetry/proto/collector/logs/v1"
commonv1 "buf.build/gen/go/opentelemetry/opentelemetry/protocolbuffers/go/opentelemetry/proto/common/v1"
"github.com/Azure/adx-mon/collector/logs/engine"
"github.com/Azure/adx-mon/collector/logs/types"
"github.com/Azure/adx-mon/metrics"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/otlp"
gbp "github.com/libp2p/go-buffer-pool"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/protobuf/proto"
)
type LogsServiceOpts struct {
WorkerCreator engine.WorkerCreatorFunc
HealthChecker interface{ IsHealthy() bool }
}
type LogsService struct {
workerCreator engine.WorkerCreatorFunc
outputQueue chan *types.LogBatch
logger *slog.Logger
healthChecker interface{ IsHealthy() bool }
wg sync.WaitGroup
}
func NewLogsService(opts LogsServiceOpts) *LogsService {
return &LogsService{
workerCreator: opts.WorkerCreator,
outputQueue: make(chan *types.LogBatch, 16),
logger: slog.Default().With(
slog.Group(
"handler",
slog.String("protocol", "otlp"),
),
),
healthChecker: opts.HealthChecker,
}
}
func (s *LogsService) Open(ctx context.Context) error {
worker := s.workerCreator("otlp-logs", s.outputQueue)
s.wg.Add(1)
go func() {
defer s.wg.Done()
worker.Run()
}()
return nil
}
func (s *LogsService) Close() error {
close(s.outputQueue)
s.wg.Wait()
return nil
}
func (s *LogsService) Handler(w http.ResponseWriter, r *http.Request) {
m := metrics.RequestsReceived.MustCurryWith(prometheus.Labels{"path": "/v1/logs"})
defer r.Body.Close()
if !s.healthChecker.IsHealthy() {
m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
return
}
switch r.Header.Get("Content-Type") {
case "application/x-protobuf":
// Consume the request body and marshal into a protobuf
b := gbp.Get(int(r.ContentLength))
defer gbp.Put(b)
n, err := io.ReadFull(r.Body, b)
if err != nil {
s.logger.Error("Failed to read request body", "Error", err)
w.WriteHeader(http.StatusInternalServerError)
m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
return
}
if n < int(r.ContentLength) {
s.logger.Warn("Short read")
w.WriteHeader(http.StatusBadRequest)
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
return
}
b = b[:n]
if logger.IsDebug() {
s.logger.Debug("Received request body", "Bytes", n)
}
msg := &v1.ExportLogsServiceRequest{}
if err := proto.Unmarshal(b, msg); err != nil {
s.logger.Error("Failed to unmarshal request body", "Error", err)
w.WriteHeader(http.StatusBadRequest)
m.WithLabelValues(strconv.Itoa(http.StatusBadRequest)).Inc()
return
}
logBatch := types.LogBatchPool.Get(1).(*types.LogBatch)
logBatch.Reset()
droppedLogMissingMetadata := s.convertToLogBatch(msg, logBatch)
s.outputQueue <- logBatch
// The logs have been committed by the OTLP endpoint
resp := &v1.ExportLogsServiceResponse{}
if droppedLogMissingMetadata > 0 {
resp.SetPartialSuccess(&v1.ExportLogsPartialSuccess{
RejectedLogRecords: droppedLogMissingMetadata,
ErrorMessage: "Logs lacking kube.database and kube.table attributes or body fields",
})
metrics.InvalidLogsDropped.WithLabelValues().Add(float64(droppedLogMissingMetadata))
}
respBodyBytes, err := proto.Marshal(resp)
if err != nil {
s.logger.Error("Failed to marshal response", "Error", err)
w.WriteHeader(http.StatusInternalServerError)
m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
return
}
// Even in the case of a partial response, OTLP API requires us to send StatusOK
w.Header().Add("Content-Type", "application/x-protobuf")
w.WriteHeader(http.StatusOK)
w.Write(respBodyBytes)
m.WithLabelValues(strconv.Itoa(http.StatusOK)).Inc()
case "application/json":
// We're receiving JSON, so we need to unmarshal the JSON
// into an OTLP protobuf, then use gRPC to send the OTLP
// protobuf to the OTLP endpoint
w.WriteHeader(http.StatusUnsupportedMediaType)
m.WithLabelValues(strconv.Itoa(http.StatusUnsupportedMediaType)).Inc()
default:
logger.Errorf("Unsupported Content-Type: %s", r.Header.Get("Content-Type"))
w.WriteHeader(http.StatusUnsupportedMediaType)
m.WithLabelValues(strconv.Itoa(http.StatusUnsupportedMediaType)).Inc()
}
}
var (
ErrMissingKustoMetadata = errors.New("missing kusto metadata")
ErrMalformedLogs = errors.New("malformed log records")
)
// convertToLogBatch populates the LogBatch with the logs from the OTLP message. Returns the number of logs that were lacking kusto routing metadata
func (s *LogsService) convertToLogBatch(msg *v1.ExportLogsServiceRequest, logBatch *types.LogBatch) int64 {
if msg == nil {
return 0
}
var droppedLogMissingMetadata int64 = 0
for _, resourceLog := range msg.ResourceLogs {
if resourceLog == nil {
continue
}
resourceAttributes := make(map[string]any)
if resourceLog.Resource != nil {
extractKeyValues(resourceLog.Resource.Attributes, func(k string, v any) {
resourceAttributes[k] = v
})
}
for _, scope := range resourceLog.ScopeLogs {
if scope == nil {
continue
}
for _, record := range scope.LogRecords {
if record == nil {
continue
}
// Senders are required to include Kusto metadata in the attributes or body
// Must have kusto.database and kusto.table
dbName, tableName := otlp.KustoMetadata(record)
if dbName == "" || tableName == "" {
if logger.IsDebug() {
s.logger.Warn("Missing Kusto metadata", "Payload", record.String())
}
droppedLogMissingMetadata++
continue
}
log := types.LogPool.Get(1).(*types.Log)
log.Reset()
log.SetTimestamp(record.TimeUnixNano)
log.SetObservedTimestamp(record.ObservedTimeUnixNano)
extractKeyValues(record.Attributes, log.SetAttributeValue)
extractBody(record.Body, log.SetBodyValue)
for k, v := range resourceAttributes {
log.SetResourceValue(k, v)
}
log.SetAttributeValue(types.AttributeDatabaseName, dbName)
log.SetAttributeValue(types.AttributeTableName, tableName)
metrics.LogKeys.WithLabelValues(dbName, tableName).Inc()
logBatch.Logs = append(logBatch.Logs, log)
}
}
}
return droppedLogMissingMetadata
}
const defaultMaxDepth = 20
func extractBody(body *commonv1.AnyValue, dest func(k string, v any)) {
if body == nil || !body.HasValue() {
return
}
if body.HasKvlistValue() {
kvList := body.GetKvlistValue()
if kvList == nil {
return
}
extractKeyValues(kvList.Values, dest)
} else {
vv, ok := extract(body, 0, defaultMaxDepth)
if ok {
dest(types.BodyKeyMessage, vv)
}
}
}
func extractKeyValues(kvs []*commonv1.KeyValue, dest func(k string, v any)) {
if kvs == nil {
return
}
for _, kv := range kvs {
if kv == nil {
continue
}
vv, ok := extract(kv.Value, 0, defaultMaxDepth)
if !ok {
continue
}
dest(kv.Key, vv)
}
}
func extract(val *commonv1.AnyValue, depth int, maxdepth int) (value any, ok bool) {
if val == nil {
return nil, false
}
if depth > maxdepth {
return "...", true // Just cut off here.
}
switch val.Value.(type) {
case *commonv1.AnyValue_StringValue:
return val.GetStringValue(), true
case *commonv1.AnyValue_BoolValue:
return val.GetBoolValue(), true
case *commonv1.AnyValue_IntValue:
return val.GetIntValue(), true
case *commonv1.AnyValue_DoubleValue:
return val.GetDoubleValue(), true
case *commonv1.AnyValue_BytesValue:
return val.GetBytesValue(), true
case *commonv1.AnyValue_ArrayValue:
arrayValue := val.GetArrayValue()
if arrayValue == nil {
return nil, false
}
ret := make([]any, 0, len(arrayValue.Values))
for _, v := range arrayValue.Values {
vv, ok := extract(v, depth+1, maxdepth)
if !ok {
continue
}
ret = append(ret, vv)
}
return ret, true
case *commonv1.AnyValue_KvlistValue:
kvList := val.GetKvlistValue()
if kvList == nil {
return nil, false
}
ret := map[string]any{}
for _, kv := range kvList.Values {
if kv == nil || !kv.HasValue() {
continue
}
vv, ok := extract(kv.Value, depth+1, maxdepth)
if !ok {
continue
}
ret[kv.Key] = vv
}
return ret, true
default:
return nil, false
}
}