model/modelprocessor/datastream.go (135 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package modelprocessor
import (
"context"
"fmt"
"strconv"
"strings"
"github.com/elastic/apm-data/model/modelpb"
)
const (
logsType = "logs"
logDefaultServiceName = "unknown"
appLogsDataset = "apm.app"
errorsDataset = "apm.error"
metricsType = "metrics"
appMetricsDataset = "apm.app"
internalMetricsDataset = "apm.internal"
tracesType = "traces"
tracesDataset = "apm"
rumTracesDataset = "apm.rum"
)
// SetDataStream is a modelpb.BatchProcessor that routes events to the appropriate
// data streams.
type SetDataStream struct {
Namespace string
}
// ProcessBatch sets data stream fields for each event in b.
func (s *SetDataStream) ProcessBatch(ctx context.Context, b *modelpb.Batch) error {
for i := range *b {
if (*b)[i].DataStream == nil {
(*b)[i].DataStream = &modelpb.DataStream{}
}
if (*b)[i].DataStream.Namespace == "" || isRUMAgentName((*b)[i].GetAgent().GetName()) {
// Only set namespace if
// 1. it is not already set in the input event; OR
// 2. it is from RUM agents, so that they cannot create arbitrarily many data streams
(*b)[i].DataStream.Namespace = s.Namespace
}
if (*b)[i].DataStream.Type == "" || (*b)[i].DataStream.Dataset == "" {
s.setDataStream((*b)[i])
}
}
return nil
}
func (s *SetDataStream) setDataStream(event *modelpb.APMEvent) {
switch event.Type() {
case modelpb.SpanEventType, modelpb.TransactionEventType:
event.DataStream.Type = tracesType
if event.DataStream.Dataset == "" {
// Only set dataset if it is not already set in the input event
event.DataStream.Dataset = tracesDataset
}
// In order to maintain different ILM policies, RUM traces are sent to
// a different datastream.
// RUM agents should not be able to configure dataset.
if isRUMAgentName(event.GetAgent().GetName()) {
event.DataStream.Dataset = rumTracesDataset
}
case modelpb.ErrorEventType:
event.DataStream.Type = logsType
event.DataStream.Dataset = errorsDataset
case modelpb.LogEventType:
event.DataStream.Type = logsType
if event.DataStream.Dataset == "" || isRUMAgentName(event.GetAgent().GetName()) {
// Only set dataset if it is not already set in the input event
event.DataStream.Dataset = getAppLogsDataset(event)
}
case modelpb.MetricEventType:
event.DataStream.Type = metricsType
if event.DataStream.Dataset == "" || isRUMAgentName(event.GetAgent().GetName()) {
// Only set dataset if it is not already set in the input event
event.DataStream.Dataset = metricsetDataset(event)
}
}
}
func isRUMAgentName(agentName string) bool {
switch agentName {
// These are all the known agents that send "RUM" data to the APM Server.
case "rum-js", "js-base", "iOS/swift":
return true
}
return false
}
func getAppLogsDataset(event *modelpb.APMEvent) string {
serviceName := event.GetService().GetName()
if serviceName == "" {
serviceName = logDefaultServiceName
}
var dataset strings.Builder
dataset.WriteString(appLogsDataset)
dataset.WriteByte('.')
dataset.WriteString(normalizeServiceName(serviceName))
return dataset.String()
}
func metricsetDataset(event *modelpb.APMEvent) string {
if event.Transaction != nil || event.Span != nil || event.GetService().GetName() == "" || event.GetMetricset().GetName() == "service_summary" {
// Metrics that include well-defined transaction/span fields
// (i.e. breakdown metrics, transaction and span metrics) will
// be stored separately from custom application metrics.
// If events contain the `metricset.interval` field, the dataset will
// be formatted as: `apm.${metricset.name}.${metricset.interval}.
if ms := event.Metricset; ms.Interval != "" {
return fmt.Sprintf("apm.%s.%s", ms.Name, ms.Interval)
}
return internalMetricsDataset
}
if event.Metricset != nil {
// Well-defined system/runtime metrics are stored in the shared
// "internal" metrics data stream, as they are not application-specific.
//
// TODO(axw) agents should indicate that a metricset is internal.
// If a metricset is identified as internal, then we'll ignore any
// metrics that we don't already know about; otherwise they will end
// up creating service-specific data streams.
internal := true
// set internal to false for metrics translated using OTel remappers.
if label, ok := event.Labels["otel_remapped"]; ok && label != nil {
remapped, _ := strconv.ParseBool(label.Value)
internal = !remapped
}
if internal {
for _, s := range event.Metricset.Samples {
if !IsInternalMetricName(s.Name) {
internal = false
break
}
}
}
if internal {
// The internal metrics data stream does not use dynamic
// mapping, so we must drop type and unit if specified.
for i, sample := range event.Metricset.Samples {
if sample.Type == modelpb.MetricType_METRIC_TYPE_UNSPECIFIED && sample.Unit == "" {
continue
}
sample.Type = modelpb.MetricType_METRIC_TYPE_UNSPECIFIED
sample.Unit = ""
event.Metricset.Samples[i] = sample
}
return internalMetricsDataset
}
}
// All other metrics are assumed to be application-specific metrics,
// and so will be stored in an application-specific data stream.
suffix := normalizeServiceName(event.Service.Name)
var dataset strings.Builder
dataset.Grow(len(appMetricsDataset) + 1 + len(suffix))
dataset.WriteString(appMetricsDataset)
dataset.WriteByte('.')
dataset.WriteString(suffix)
return dataset.String()
}
// normalizeServiceName translates serviceName into a string suitable
// for inclusion in a data stream name.
//
// Concretely, this function will lowercase the string and replace any
// reserved characters with "_".
func normalizeServiceName(s string) string {
s = strings.ToLower(s)
s = strings.Map(replaceReservedRune, s)
return s
}
func replaceReservedRune(r rune) rune {
switch r {
case '\\', '/', '*', '?', '"', '<', '>', '|', ' ', ',', '#', ':':
// These characters are not permitted in data stream names
// by Elasticsearch.
return '_'
case '-':
// Hyphens are used to separate the data stream type, dataset,
// and namespace.
return '_'
}
return r
}