exporter/elasticsearchexporter/internal/objmodel/objmodel.go (427 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// The objmodel package provides tools for converting OpenTelemetry Log records into
// JSON documents.
//
// The JSON parsing in Elasticsearch does not support parsing JSON documents
// with duplicate fields. The fields in the document can be sort and duplicate entries
// can be removed before serializing. Deduplication ensures that ambiguous
// events can still be indexed.
//
// With attributes map encoded as a list of key value
// pairs, we might find some structured loggers that create log records with
// duplicate fields. Although the AttributeMap wrapper tries to give a
// dictionary like view into the list, it is not 'complete'. When iterating the map
// for encoding, we still will encounter the duplicates.
// The AttributeMap helpers treat the first occurrence as the actual field.
// For high-performance structured loggers (e.g. zap) the AttributeMap
// semantics are not necessarily correct. Most often the last occurrence will be
// what we want to export, as the last occurrence represents the last overwrite
// within a context/dictionary (the leaf-logger its context).
// Some Loggers might even allow users to create a mix of dotted and dedotted fields.
// The Document type also tries to combine these into a proper structure, such that these mixed
// representations have a unique encoding only, which allows us to properly remove duplicates.
//
// The `.` is special to Elasticsearch. In order to handle common prefixes and attributes
// being a mix of key value pairs with dots and complex objects, we flatten the document first
// before we deduplicate. Final dedotting is optional and only required when
// Ingest Node is used. But either way, we try to present only well formed
// document to Elasticsearch.
package objmodel // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel"
import (
"encoding/hex"
"io"
"maps"
"math"
"sort"
"strings"
"time"
"github.com/elastic/go-structform"
"github.com/elastic/go-structform/json"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
)
// Document is an intermediate representation for converting open telemetry records with arbitrary attributes
// into a JSON document that can be processed by Elasticsearch.
type Document struct {
fields []field
dynamicTemplates map[string]string
}
type field struct {
key string
value Value
}
// Value type that can be added to a Document.
type Value struct {
kind Kind
ui uint64
i int64
dbl float64
str string
arr []Value
doc Document
ts time.Time
}
// Kind represent the internal kind of a value stored in a Document.
type Kind uint8
// Enum values for Kind.
const (
KindNil Kind = iota
KindBool
KindInt
KindUInt
KindDouble
KindString
KindArr
KindObject
KindTimestamp
KindIgnore
KindUnflattenableObject // Unflattenable object is an object that should not be flattened at serialization time
)
const tsLayout = "2006-01-02T15:04:05.000000000Z"
var (
nilValue = Value{kind: KindNil}
ignoreValue = Value{kind: KindIgnore}
)
// DocumentFromAttributes creates a document from a OpenTelemetry attribute
// map. All nested maps will be flattened, with keys being joined using a `.` symbol.
func DocumentFromAttributes(am pcommon.Map) Document {
return DocumentFromAttributesWithPath("", am)
}
// DocumentFromAttributesWithPath creates a document from a OpenTelemetry attribute
// map. All nested maps will be flattened, with keys being joined using a `.` symbol.
//
// All keys in the map will be prefixed with path.
func DocumentFromAttributesWithPath(path string, am pcommon.Map) Document {
if am.Len() == 0 {
return Document{}
}
fields := make([]field, 0, am.Len())
fields = appendAttributeFields(fields, path, am)
return Document{fields: fields}
}
func (doc *Document) Clone() *Document {
fields := make([]field, len(doc.fields))
copy(fields, doc.fields)
return &Document{fields: fields, dynamicTemplates: maps.Clone(doc.dynamicTemplates)}
}
func (doc *Document) AddDynamicTemplate(path, template string) {
if doc.dynamicTemplates == nil {
doc.dynamicTemplates = make(map[string]string)
}
doc.dynamicTemplates[path] = template
}
func (doc *Document) DynamicTemplates() map[string]string {
return doc.dynamicTemplates
}
// AddTimestamp adds a raw timestamp value to the Document.
func (doc *Document) AddTimestamp(key string, ts pcommon.Timestamp) {
doc.Add(key, TimestampValue(ts.AsTime()))
}
// Add adds a converted value to the document.
func (doc *Document) Add(key string, v Value) {
doc.fields = append(doc.fields, field{key: key, value: v})
}
// AddString adds a string to the document.
func (doc *Document) AddString(key string, v string) {
if v != "" {
doc.Add(key, StringValue(v))
}
}
// AddSpanID adds the hex presentation of a SpanID to the document. If the SpanID
// is empty, no value will be added.
func (doc *Document) AddSpanID(key string, id pcommon.SpanID) {
if !id.IsEmpty() {
doc.AddString(key, hex.EncodeToString(id[:]))
}
}
// AddTraceID adds the hex presentation of a TraceID value to the document. If the TraceID
// is empty, no value will be added.
func (doc *Document) AddTraceID(key string, id pcommon.TraceID) {
if !id.IsEmpty() {
doc.AddString(key, hex.EncodeToString(id[:]))
}
}
// AddInt adds an integer value to the document.
func (doc *Document) AddInt(key string, value int64) {
doc.Add(key, IntValue(value))
}
// AddUInt adds an unsigned integer value to the document.
func (doc *Document) AddUInt(key string, value uint64) {
doc.Add(key, UIntValue(value))
}
// AddAttributes expands and flattens all key-value pairs from the input attribute map into
// the document.
func (doc *Document) AddAttributes(key string, attributes pcommon.Map) {
doc.fields = appendAttributeFields(doc.fields, key, attributes)
}
// AddAttribute converts and adds a AttributeValue to the document. If the attribute represents a map,
// the fields will be flattened.
func (doc *Document) AddAttribute(key string, attribute pcommon.Value) {
switch attribute.Type() {
case pcommon.ValueTypeEmpty:
// do not add 'null'
case pcommon.ValueTypeMap:
doc.AddAttributes(key, attribute.Map())
default:
doc.Add(key, ValueFromAttribute(attribute))
}
}
// AddEvents converts and adds span events to the document.
func (doc *Document) AddEvents(key string, events ptrace.SpanEventSlice) {
for i := 0; i < events.Len(); i++ {
e := events.At(i)
doc.AddTimestamp(flattenKey(key, e.Name()+".time"), e.Timestamp())
doc.AddAttributes(flattenKey(key, e.Name()), e.Attributes())
}
}
func (doc *Document) sort() {
sort.SliceStable(doc.fields, func(i, j int) bool {
return doc.fields[i].key < doc.fields[j].key
})
for i := range doc.fields {
fld := &doc.fields[i]
fld.value.sort()
}
}
// Dedup removes fields from the document, that have duplicate keys.
// The filtering only keeps the last value for a key.
//
// Dedup ensure that keys are sorted.
func (doc *Document) Dedup() {
// 1. Always ensure the fields are sorted, Dedup support requires
// Fields to be sorted.
doc.sort()
// 2. rename fields if a primitive value is overwritten by an object.
// For example the pair (path.x=1, path.x.a="test") becomes:
// (path.x.value=1, path.x.a="test").
//
// NOTE: We do the renaming, in order to preserve the original value
// in case of conflicts after dedotting, which would lead to the removal of the field.
// For example docker/k8s labels tend to use `.`, which need to be handled in case
// The collector does pass us these kind of labels as an AttributeMap.
//
// NOTE: If the embedded document already has a field name `value`, we will remove the renamed
// field in favor of the `value` field in the document.
//
// This step removes potential conflicts when dedotting and serializing fields.
var renamed bool
for i := 0; i < len(doc.fields)-1; i++ {
key, nextKey := doc.fields[i].key, doc.fields[i+1].key
if len(key) < len(nextKey) && strings.HasPrefix(nextKey, key) && nextKey[len(key)] == '.' {
renamed = true
doc.fields[i].key = key + ".value"
}
}
if renamed {
doc.sort()
}
// 3. mark duplicates as 'ignore'
//
// This step ensures that we do not have duplicate fields names when serializing.
// Elasticsearch JSON parser will fail otherwise.
for i := 0; i < len(doc.fields)-1; i++ {
if doc.fields[i].key == doc.fields[i+1].key {
doc.fields[i].value = ignoreValue
}
}
// 4. fix objects that might be stored in arrays
for i := range doc.fields {
doc.fields[i].value.Dedup()
}
}
func newJSONVisitor(w io.Writer) *json.Visitor {
v := json.NewVisitor(w)
// Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1.
// This is required to generate the correct dynamic mapping in ES.
v.SetExplicitRadixPoint(true)
return v
}
// Serialize writes the document to the given writer. The document fields will be
// deduplicated and, if dedot is true, turned into nested objects prior to
// serialization.
func (doc *Document) Serialize(w io.Writer, dedot bool) error {
doc.Dedup()
v := newJSONVisitor(w)
return doc.iterJSON(v, dedot)
}
func (doc *Document) iterJSON(v *json.Visitor, dedot bool) error {
if dedot {
return doc.iterJSONDedot(v)
}
return doc.iterJSONFlat(v)
}
func (doc *Document) iterJSONFlat(w *json.Visitor) error {
err := w.OnObjectStart(-1, structform.AnyType)
if err != nil {
return err
}
defer func() {
_ = w.OnObjectFinished()
}()
for i := range doc.fields {
fld := &doc.fields[i]
if fld.value.IsEmpty() {
continue
}
if err := w.OnKey(fld.key); err != nil {
return err
}
if err := fld.value.iterJSON(w, true); err != nil {
return err
}
}
return nil
}
func (doc *Document) iterJSONDedot(w *json.Visitor) error {
objPrefix := ""
level := 0
if err := w.OnObjectStart(-1, structform.AnyType); err != nil {
return err
}
defer func() {
_ = w.OnObjectFinished()
}()
for i := range doc.fields {
fld := &doc.fields[i]
if fld.value.IsEmpty() {
continue
}
key := fld.key
// decrease object level until last reported and current key have the same path prefix
for L := commonObjPrefix(key, objPrefix); L < len(objPrefix); {
for L > 0 && key[L-1] != '.' {
L--
}
// remove levels and append write list of outstanding '}' into the writer
if L > 0 {
for delta := objPrefix[L:]; len(delta) > 0; {
idx := strings.IndexByte(delta, '.')
if idx < 0 {
break
}
delta = delta[idx+1:]
level--
if err := w.OnObjectFinished(); err != nil {
return err
}
}
objPrefix = key[:L]
} else { // no common prefix, close all objects we reported so far.
for ; level > 0; level-- {
if err := w.OnObjectFinished(); err != nil {
return err
}
}
objPrefix = ""
}
}
// increase object level up to current field
for {
start := len(objPrefix)
idx := strings.IndexByte(key[start:], '.')
if idx < 0 {
break
}
level++
objPrefix = key[:len(objPrefix)+idx+1]
fieldName := key[start : start+idx]
if err := w.OnKey(fieldName); err != nil {
return err
}
if err := w.OnObjectStart(-1, structform.AnyType); err != nil {
return err
}
}
// report value
fieldName := key[len(objPrefix):]
if err := w.OnKey(fieldName); err != nil {
return err
}
if err := fld.value.iterJSON(w, true); err != nil {
return err
}
}
// close all pending object levels
for ; level > 0; level-- {
if err := w.OnObjectFinished(); err != nil {
return err
}
}
return nil
}
// StringValue create a new value from a string.
func StringValue(str string) Value { return Value{kind: KindString, str: str} }
// IntValue creates a new value from an integer.
func IntValue(i int64) Value { return Value{kind: KindInt, i: i} }
// UIntValue creates a new value from an unsigned integer.
func UIntValue(i uint64) Value { return Value{kind: KindUInt, ui: i} }
// DoubleValue creates a new value from a double value..
func DoubleValue(d float64) Value { return Value{kind: KindDouble, dbl: d} }
// BoolValue creates a new value from a double value..
func BoolValue(b bool) Value {
var v uint64
if b {
v = 1
}
return Value{kind: KindBool, ui: v}
}
// ArrValue combines multiple values into an array value.
func ArrValue(values ...Value) Value {
return Value{kind: KindArr, arr: values}
}
// TimestampValue create a new value from a time.Time.
func TimestampValue(ts time.Time) Value {
return Value{kind: KindTimestamp, ts: ts}
}
// UnflattenableObjectValue creates a unflattenable object from a map
func UnflattenableObjectValue(m pcommon.Map) Value {
sub := DocumentFromAttributes(m)
return Value{kind: KindUnflattenableObject, doc: sub}
}
// ValueFromAttribute converts a AttributeValue into a value.
func ValueFromAttribute(attr pcommon.Value) Value {
switch attr.Type() {
case pcommon.ValueTypeInt:
return IntValue(attr.Int())
case pcommon.ValueTypeDouble:
return DoubleValue(attr.Double())
case pcommon.ValueTypeStr:
return StringValue(attr.Str())
case pcommon.ValueTypeBool:
return BoolValue(attr.Bool())
case pcommon.ValueTypeSlice:
sub := arrFromAttributes(attr.Slice())
return ArrValue(sub...)
case pcommon.ValueTypeMap:
sub := DocumentFromAttributes(attr.Map())
return Value{kind: KindObject, doc: sub}
default:
return nilValue
}
}
func (v *Value) sort() {
switch v.kind {
case KindObject:
v.doc.sort()
case KindArr:
for i := range v.arr {
v.arr[i].sort()
}
}
}
// Dedup recursively dedups keys in stored documents.
//
// NOTE: The value MUST be sorted.
func (v *Value) Dedup() {
switch v.kind {
case KindObject:
v.doc.Dedup()
case KindArr:
for i := range v.arr {
v.arr[i].Dedup()
}
}
}
func (v *Value) IsEmpty() bool {
switch v.kind {
case KindNil, KindIgnore:
return true
case KindArr:
return len(v.arr) == 0
case KindObject:
return len(v.doc.fields) == 0
default:
return false
}
}
func (v *Value) iterJSON(w *json.Visitor, dedot bool) error {
switch v.kind {
case KindNil:
return w.OnNil()
case KindBool:
return w.OnBool(v.ui == 1)
case KindInt:
return w.OnInt64(v.i)
case KindUInt:
return w.OnUint64(v.ui)
case KindDouble:
if math.IsNaN(v.dbl) || math.IsInf(v.dbl, 0) {
// NaN and Inf are undefined for JSON. Let's serialize to "null"
return w.OnNil()
}
return w.OnFloat64(v.dbl)
case KindString:
return w.OnString(v.str)
case KindTimestamp:
str := v.ts.UTC().Format(tsLayout)
return w.OnString(str)
case KindObject:
if len(v.doc.fields) == 0 {
return w.OnNil()
}
return v.doc.iterJSON(w, dedot)
case KindUnflattenableObject:
if len(v.doc.fields) == 0 {
return w.OnNil()
}
return v.doc.iterJSON(w, true)
case KindArr:
if err := w.OnArrayStart(-1, structform.AnyType); err != nil {
return err
}
for i := range v.arr {
if err := v.arr[i].iterJSON(w, dedot); err != nil {
return err
}
}
if err := w.OnArrayFinished(); err != nil {
return err
}
}
return nil
}
func arrFromAttributes(aa pcommon.Slice) []Value {
if aa.Len() == 0 {
return nil
}
values := make([]Value, aa.Len())
for i := 0; i < aa.Len(); i++ {
values[i] = ValueFromAttribute(aa.At(i))
}
return values
}
func appendAttributeFields(fields []field, path string, am pcommon.Map) []field {
for k, val := range am.All() {
fields = appendAttributeValue(fields, path, k, val)
}
return fields
}
func appendAttributeValue(fields []field, path string, key string, attr pcommon.Value) []field {
if attr.Type() == pcommon.ValueTypeEmpty {
return fields
}
if attr.Type() == pcommon.ValueTypeMap {
return appendAttributeFields(fields, flattenKey(path, key), attr.Map())
}
return append(fields, field{
key: flattenKey(path, key),
value: ValueFromAttribute(attr),
})
}
func flattenKey(path, key string) string {
if path == "" {
return key
}
return path + "." + key
}
func commonObjPrefix(a, b string) int {
end := len(a)
if alt := len(b); alt < end {
end = alt
}
for i := 0; i < end; i++ {
if a[i] != b[i] {
return i
}
}
return end
}