receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go (322 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package libhoneyevent // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" import ( "crypto/rand" "encoding/binary" "encoding/hex" "encoding/json" "errors" "fmt" "hash/fnv" "slices" "strings" "time" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" trc "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/eventtime" ) // FieldMapConfig is used to map the fields from the LibhoneyEvent to PData formats type FieldMapConfig struct { Resources ResourcesConfig `mapstructure:"resources"` Scopes ScopesConfig `mapstructure:"scopes"` Attributes AttributesConfig `mapstructure:"attributes"` } // ResourcesConfig is used to map the fields from the LibhoneyEvent to PData formats type ResourcesConfig struct { ServiceName string `mapstructure:"service_name"` } // ScopesConfig is used to map the fields from the LibhoneyEvent to PData formats type ScopesConfig struct { LibraryName string `mapstructure:"library_name"` LibraryVersion string `mapstructure:"library_version"` } // AttributesConfig is used to map the fields from the LibhoneyEvent to PData formats type AttributesConfig struct { TraceID string `mapstructure:"trace_id"` ParentID string `mapstructure:"parent_id"` SpanID string `mapstructure:"span_id"` Name string `mapstructure:"name"` Error string `mapstructure:"error"` SpanKind string `mapstructure:"spankind"` DurationFields []string `mapstructure:"durationFields"` } // LibhoneyEvent is the event structure from libhoney type LibhoneyEvent struct { Samplerate int `json:"samplerate" msgpack:"samplerate"` MsgPackTimestamp *time.Time `msgpack:"time"` Time string `json:"time"` // should not be trusted. use MsgPackTimestamp Data map[string]any `json:"data" msgpack:"data"` } // UnmarshalJSON overrides the unmarshall to make sure the MsgPackTimestamp is set func (l *LibhoneyEvent) UnmarshalJSON(j []byte) error { type _libhoneyEvent LibhoneyEvent tstr := eventtime.GetEventTimeDefaultString() tzero := time.Time{} tmp := _libhoneyEvent{Time: "none", MsgPackTimestamp: &tzero, Samplerate: 1} err := json.Unmarshal(j, &tmp) if err != nil { return err } if tmp.MsgPackTimestamp.IsZero() && tmp.Time == "none" { // neither timestamp was set. give it right now. tmp.Time = tstr tnow := time.Now() tmp.MsgPackTimestamp = &tnow } if tmp.MsgPackTimestamp.IsZero() { propertime := eventtime.GetEventTime(tmp.Time) tmp.MsgPackTimestamp = &propertime } *l = LibhoneyEvent(tmp) return nil } // DebugString returns a string representation of the LibhoneyEvent func (l *LibhoneyEvent) DebugString() string { return fmt.Sprintf("%#v", l) } // SignalType returns the type of signal this event represents. Only log is implemented for now. func (l *LibhoneyEvent) SignalType(logger zap.Logger) string { if sig, ok := l.Data["meta.signal_type"]; ok { switch sig { case "trace": if atype, ok := l.Data["meta.annotation_type"]; ok { switch atype { case "span_event": return "span_event" case "link": return "span_link" } logger.Warn("invalid annotation type", zap.String("meta.annotation_type", atype.(string))) return "span" } return "span" case "log": return "log" default: logger.Warn("invalid meta.signal_type", zap.String("meta.signal_type", sig.(string))) return "log" } } logger.Warn("missing meta.signal_type and meta.annotation_type") return "log" } // GetService returns the service name from the event or the dataset name if no service name is found. func (l *LibhoneyEvent) GetService(fields FieldMapConfig, seen *ServiceHistory, dataset string) (string, error) { if serviceName, ok := l.Data[fields.Resources.ServiceName]; ok { seen.NameCount[serviceName.(string)]++ return serviceName.(string), nil } return dataset, errors.New("no service.name found in event") } // GetScope returns the scope key for the event. If the scope has not been seen before, it creates a new one. func (l *LibhoneyEvent) GetScope(fields FieldMapConfig, seen *ScopeHistory, serviceName string) (string, error) { if scopeLibraryName, ok := l.Data[fields.Scopes.LibraryName]; ok { scopeKey := serviceName + scopeLibraryName.(string) if _, ok := seen.Scope[scopeKey]; ok { // if we've seen it, we don't expect it to be different right away so we'll just return it. return scopeKey, nil } // otherwise, we need to make a new found scope scopeLibraryVersion := "unset" if scopeLibVer, ok := l.Data[fields.Scopes.LibraryVersion]; ok { scopeLibraryVersion = scopeLibVer.(string) } newScope := SimpleScope{ ServiceName: serviceName, // we only set the service name once. If the same library comes from multiple services in the same batch, we're in trouble. LibraryName: scopeLibraryName.(string), LibraryVersion: scopeLibraryVersion, ScopeSpans: ptrace.NewSpanSlice(), ScopeLogs: plog.NewLogRecordSlice(), } seen.Scope[scopeKey] = newScope return scopeKey, nil } return "libhoney.receiver", errors.New("library name not found") } func spanIDFrom(s string) trc.SpanID { hash := fnv.New64a() hash.Write([]byte(s)) n := hash.Sum64() sid := trc.SpanID{} binary.LittleEndian.PutUint64(sid[:], n) return sid } func traceIDFrom(s string) trc.TraceID { hash := fnv.New64a() hash.Write([]byte(s)) n1 := hash.Sum64() hash.Write([]byte(s)) n2 := hash.Sum64() tid := trc.TraceID{} binary.LittleEndian.PutUint64(tid[:], n1) binary.LittleEndian.PutUint64(tid[8:], n2) return tid } func generateAnID(length int) []byte { token := make([]byte, length) _, err := rand.Read(token) if err != nil { return []byte{} } return token } // SimpleScope is a simple struct to hold the scope data type SimpleScope struct { ServiceName string LibraryName string LibraryVersion string ScopeSpans ptrace.SpanSlice ScopeLogs plog.LogRecordSlice } // ScopeHistory is a map of scope keys to the SimpleScope object type ScopeHistory struct { Scope map[string]SimpleScope // key here is service.name+library.name } // ServiceHistory is a map of service names to the number of times they've been seen type ServiceHistory struct { NameCount map[string]int } // ToPLogRecord converts a LibhoneyEvent to a Pdata LogRecord func (l *LibhoneyEvent) ToPLogRecord(newLog *plog.LogRecord, alreadyUsedFields *[]string, logger zap.Logger) error { timeNs := l.MsgPackTimestamp.UnixNano() logger.Debug("processing log with", zap.Int64("timestamp", timeNs)) newLog.SetTimestamp(pcommon.Timestamp(timeNs)) if logSevCode, ok := l.Data["severity_code"]; ok { logSevInt := int32(logSevCode.(int64)) newLog.SetSeverityNumber(plog.SeverityNumber(logSevInt)) } if logSevText, ok := l.Data["severity_text"]; ok { newLog.SetSeverityText(logSevText.(string)) } if logFlags, ok := l.Data["flags"]; ok { logFlagsUint := uint32(logFlags.(uint64)) newLog.SetFlags(plog.LogRecordFlags(logFlagsUint)) } // undoing this is gonna be complicated: https://github.com/honeycombio/husky/blob/91c0498333cd9f5eed1fdb8544ca486db7dea565/otlp/logs.go#L61 if logBody, ok := l.Data["body"]; ok { newLog.Body().SetStr(logBody.(string)) } newLog.Attributes().PutInt("SampleRate", int64(l.Samplerate)) logFieldsAlready := []string{"severity_text", "severity_code", "flags", "body"} for k, v := range l.Data { if slices.Contains(*alreadyUsedFields, k) { continue } if slices.Contains(logFieldsAlready, k) { continue } switch v := v.(type) { case string: newLog.Attributes().PutStr(k, v) case int: newLog.Attributes().PutInt(k, int64(v)) case int64, int16, int32: intv := v.(int64) newLog.Attributes().PutInt(k, intv) case float64: newLog.Attributes().PutDouble(k, v) case bool: newLog.Attributes().PutBool(k, v) default: logger.Warn("Span data type issue", zap.Int64("timestamp", timeNs), zap.String("key", k)) } } return nil } // GetParentID returns the parent id from the event or an error if it's not found func (l *LibhoneyEvent) GetParentID(fieldName string) (trc.SpanID, error) { if pid, ok := l.Data[fieldName]; ok { pid := strings.ReplaceAll(pid.(string), "-", "") pidByteArray, err := hex.DecodeString(pid) if err == nil { if len(pidByteArray) == 32 { pidByteArray = pidByteArray[8:24] } else if len(pidByteArray) >= 16 { pidByteArray = pidByteArray[0:16] } return trc.SpanID(pidByteArray), nil } return trc.SpanID{}, errors.New("parent id is not a valid span id") } return trc.SpanID{}, errors.New("parent id not found") } // ToPTraceSpan converts a LibhoneyEvent to a Pdata Span func (l *LibhoneyEvent) ToPTraceSpan(newSpan *ptrace.Span, alreadyUsedFields *[]string, cfg FieldMapConfig, logger zap.Logger) error { timeNs := l.MsgPackTimestamp.UnixNano() logger.Debug("processing trace with", zap.Int64("timestamp", timeNs)) var parentID trc.SpanID if pid, ok := l.Data[cfg.Attributes.ParentID]; ok { parentID = spanIDFrom(pid.(string)) newSpan.SetParentSpanID(pcommon.SpanID(parentID)) } durationMs := 0.0 for _, df := range cfg.Attributes.DurationFields { if duration, okay := l.Data[df]; okay { durationMs = duration.(float64) break } } endTimestamp := timeNs + (int64(durationMs) * 1000000) if tid, ok := l.Data[cfg.Attributes.TraceID]; ok { tid := strings.ReplaceAll(tid.(string), "-", "") tidByteArray, err := hex.DecodeString(tid) if err == nil { if len(tidByteArray) >= 32 { tidByteArray = tidByteArray[0:32] } newSpan.SetTraceID(pcommon.TraceID(tidByteArray)) } else { newSpan.SetTraceID(pcommon.TraceID(traceIDFrom(tid))) } } else { newSpan.SetTraceID(pcommon.TraceID(generateAnID(32))) } if sid, ok := l.Data[cfg.Attributes.SpanID]; ok { sid := strings.ReplaceAll(sid.(string), "-", "") sidByteArray, err := hex.DecodeString(sid) if err == nil { if len(sidByteArray) == 32 { sidByteArray = sidByteArray[8:24] } else if len(sidByteArray) >= 16 { sidByteArray = sidByteArray[0:16] } newSpan.SetSpanID(pcommon.SpanID(sidByteArray)) } else { newSpan.SetSpanID(pcommon.SpanID(spanIDFrom(sid))) } } else { newSpan.SetSpanID(pcommon.SpanID(generateAnID(16))) } newSpan.SetStartTimestamp(pcommon.Timestamp(timeNs)) newSpan.SetEndTimestamp(pcommon.Timestamp(endTimestamp)) if spanName, ok := l.Data[cfg.Attributes.Name]; ok { newSpan.SetName(spanName.(string)) } if spanStatusMessage, ok := l.Data["status_message"]; ok { newSpan.Status().SetMessage(spanStatusMessage.(string)) } newSpan.Status().SetCode(ptrace.StatusCodeUnset) if _, ok := l.Data[cfg.Attributes.Error]; ok { newSpan.Status().SetCode(ptrace.StatusCodeError) } if spanKind, ok := l.Data[cfg.Attributes.SpanKind]; ok { switch spanKind.(string) { case "server": newSpan.SetKind(ptrace.SpanKindServer) case "client": newSpan.SetKind(ptrace.SpanKindClient) case "producer": newSpan.SetKind(ptrace.SpanKindProducer) case "consumer": newSpan.SetKind(ptrace.SpanKindConsumer) case "internal": newSpan.SetKind(ptrace.SpanKindInternal) default: newSpan.SetKind(ptrace.SpanKindUnspecified) } } newSpan.Attributes().PutInt("SampleRate", int64(l.Samplerate)) for k, v := range l.Data { if slices.Contains(*alreadyUsedFields, k) { continue } switch v := v.(type) { case string: newSpan.Attributes().PutStr(k, v) case int: newSpan.Attributes().PutInt(k, int64(v)) case int64, int16, int32: intv := v.(int64) newSpan.Attributes().PutInt(k, intv) case float64: newSpan.Attributes().PutDouble(k, v) case bool: newSpan.Attributes().PutBool(k, v) default: logger.Warn("Span data type issue", zap.String("trace.trace_id", newSpan.TraceID().String()), zap.String("trace.span_id", newSpan.SpanID().String()), zap.String("key", k)) } } return nil }