extension/encoding/googlecloudlogentryencodingextension/log_entry.go (242 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package googlecloudlogentryencodingextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/googlecloudlogentryencodingextension"
import (
"encoding/hex"
stdjson "encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"
"cloud.google.com/go/logging/apiv2/loggingpb"
"github.com/iancoleman/strcase"
jsoniter "github.com/json-iterator/go"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"google.golang.org/genproto/googleapis/api/monitoredres"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/reflect/protoreflect"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
var (
invalidTraceID = [16]byte{}
invalidSpanID = [8]byte{}
errorParsingLogItem = errors.New("error parsing log item")
)
func cloudLoggingTraceToTraceIDBytes(trace string) ([16]byte, error) {
// Format: projects/my-gcp-project/traces/4ebc71f1def9274798cac4e8960d0095
lastSlashIdx := strings.LastIndex(trace, "/")
if lastSlashIdx == -1 {
return invalidTraceID, errorParsingLogItem
}
traceIDStr := trace[lastSlashIdx+1:]
return traceIDStrToTraceIDBytes(traceIDStr)
}
func traceIDStrToTraceIDBytes(traceIDStr string) ([16]byte, error) {
decoded, err := hex.DecodeString(traceIDStr)
if err != nil {
return invalidTraceID, err
}
if len(decoded) != 16 {
return invalidTraceID, errorParsingLogItem
}
return [16]byte(decoded), nil
}
func spanIDStrToSpanIDBytes(spanIDStr string) ([8]byte, error) {
decoded, err := hex.DecodeString(spanIDStr)
if err != nil {
return invalidSpanID, err
}
if len(decoded) != 8 {
return invalidSpanID, errorParsingLogItem
}
return [8]byte(decoded), nil
}
func cloudLoggingSeverityToNumber(severity string) plog.SeverityNumber {
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity
switch severity {
case "DEBUG":
return plog.SeverityNumberDebug
case "INFO":
return plog.SeverityNumberInfo
case "NOTICE":
return plog.SeverityNumberInfo2
case "WARNING":
return plog.SeverityNumberWarn
case "ERROR":
return plog.SeverityNumberError
case "CRITICAL":
return plog.SeverityNumberFatal
case "ALERT":
return plog.SeverityNumberFatal2
case "EMERGENCY":
return plog.SeverityNumberFatal4
case "DEFAULT":
}
return plog.SeverityNumberUnspecified
}
var (
desc protoreflect.MessageDescriptor
descOnce sync.Once
)
func getLogEntryDescriptor() protoreflect.MessageDescriptor {
descOnce.Do(func() {
var logEntry loggingpb.LogEntry
desc = logEntry.ProtoReflect().Descriptor()
})
return desc
}
type extractFn func(pcommon.Map, plog.LogRecord, pcommon.Map, string, stdjson.RawMessage, Config) error
func handleTimestamp(_ pcommon.Map, logRecord plog.LogRecord, _ pcommon.Map, _ string, value stdjson.RawMessage, _ Config) error {
// timestamp -> Timestamp
var t time.Time
err := json.Unmarshal(value, &t)
if err != nil {
return err
}
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t))
return nil
}
func handleReceiveTimestamp(_ pcommon.Map, logRecord plog.LogRecord, _ pcommon.Map, _ string, value stdjson.RawMessage, _ Config) error {
// receiveTimestamp -> Timestamp
var t time.Time
err := json.Unmarshal(value, &t)
if err != nil {
return err
}
logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(t))
return nil
}
// handleInsertID extracts the insertId field from a LogEntry and puts it into the log record attributes.
// insertId -> Attributes[“log.record.uid”] stability: experimental
// see https://github.com/open-telemetry/semantic-conventions/blob/main/model/log/registry.yaml
func handleInsertID(_ pcommon.Map, _ plog.LogRecord, logAttributes pcommon.Map, _ string, value stdjson.RawMessage, _ Config) error {
var insertID string
err := json.Unmarshal(value, &insertID)
if err != nil {
return err
}
logAttributes.PutStr("log.record.uid", insertID)
return nil
}
// handleLogName extracts the logName field from a LogEntry and puts it into the log record attributes.
// logName -> Attributes[“gcp.log_name”] stability: experimental
func handleLogName(_ pcommon.Map, _ plog.LogRecord, logAttributes pcommon.Map, _ string, value stdjson.RawMessage, _ Config) error {
var logName string
err := json.Unmarshal(value, &logName)
if err != nil {
return err
}
logAttributes.PutStr("gcp.log_name", logName)
return nil
}
func handleResource(resourceAttributes pcommon.Map, _ plog.LogRecord, _ pcommon.Map, _ string, value stdjson.RawMessage, _ Config) error {
// resource -> Resource
// mapping type -> gcp.resource_type
// labels -> gcp.label.<label>
var protoRes monitoredres.MonitoredResource
err := protojson.Unmarshal(value, &protoRes)
if err != nil {
return err
}
resourceAttributes.PutStr("gcp.resource_type", protoRes.GetType())
for k, v := range protoRes.GetLabels() {
resourceAttributes.PutStr(strcase.ToSnakeWithIgnore(fmt.Sprintf("gcp.%v", k), "."), v)
}
return nil
}
func setBodyFromText(logRecord plog.LogRecord, value string) error {
logRecord.Body().SetStr(value)
return nil
}
func handleTextPayload(_ pcommon.Map, logRecord plog.LogRecord, _ pcommon.Map, _ string, value stdjson.RawMessage, _ Config) error {
var textPayload string
err := json.Unmarshal(value, &textPayload)
if err != nil {
return err
}
return setBodyFromText(logRecord, textPayload)
}
func setBodyFromJSON(logRecord plog.LogRecord, value stdjson.RawMessage) error {
// {json,proto,text}_payload -> Body
var payload any
err := json.Unmarshal(value, &payload)
if err != nil {
return err
}
// Note: json.Unmarshal will turn a bare string into a
// go string, so this call will correctly set the body
// to a string Value.
_ = logRecord.Body().FromRaw(payload)
return nil
}
func handleJSONPayload(_ pcommon.Map, logRecord plog.LogRecord, _ pcommon.Map, _ string, value stdjson.RawMessage, config Config) error {
switch config.HandleJSONPayloadAs {
case HandleAsJSON:
return setBodyFromJSON(logRecord, value)
case HandleAsText:
return setBodyFromText(logRecord, string(value))
default:
return errors.New("unrecognized proto payload type")
}
}
func handleSeverity(_ pcommon.Map, logRecord plog.LogRecord, _ pcommon.Map, _ string, value stdjson.RawMessage, _ Config) error {
var severity string
err := json.Unmarshal(value, &severity)
if err != nil {
return err
}
// severity -> Severity
// According to the spec, this is the original string representation of
// the severity as it is known at the source:
// https://opentelemetry.io/docs/reference/specification/logs/data-model/#field-severitytext
logRecord.SetSeverityText(severity)
logRecord.SetSeverityNumber(cloudLoggingSeverityToNumber(severity))
return nil
}
func handleTrace(_ pcommon.Map, logRecord plog.LogRecord, _ pcommon.Map, _ string, value stdjson.RawMessage, _ Config) error {
var trace string
err := json.Unmarshal(value, &trace)
if err != nil {
return err
}
traceIDBytes, err := cloudLoggingTraceToTraceIDBytes(trace)
if err != nil {
return err
}
logRecord.SetTraceID(traceIDBytes)
return nil
}
func handleSpanID(_ pcommon.Map, logRecord plog.LogRecord, _ pcommon.Map, _ string, value stdjson.RawMessage, _ Config) error {
var spanID string
err := json.Unmarshal(value, &spanID)
if err != nil {
return err
}
spanIDBytes, err := spanIDStrToSpanIDBytes(spanID)
if err != nil {
return err
}
logRecord.SetSpanID(spanIDBytes)
return nil
}
func handleTraceSampled(_ pcommon.Map, logRecord plog.LogRecord, _ pcommon.Map, _ string, value stdjson.RawMessage, _ Config) error {
var traceSampled bool
err := json.Unmarshal(value, &traceSampled)
if err != nil {
return err
}
var flags plog.LogRecordFlags
logRecord.SetFlags(flags.WithIsSampled(traceSampled))
return nil
}
func handleLabels(_ pcommon.Map, _ plog.LogRecord, logAttributes pcommon.Map, _ string, value stdjson.RawMessage, _ Config) error {
var labels map[string]string
err := json.Unmarshal(value, &labels)
if err != nil {
return err
}
// labels -> Attributes
for k, v := range labels {
logAttributes.PutStr(strcase.ToSnakeWithIgnore(fmt.Sprintf("gcp.%v", k), "."), v)
}
return nil
}
func handleHTTPRequest(_ pcommon.Map, _ plog.LogRecord, logAttributes pcommon.Map, key string, value stdjson.RawMessage, _ Config) error {
httpRequestAttrs := logAttributes.PutEmptyMap("gcp.http_request")
err := translateInto(httpRequestAttrs, getLogEntryDescriptor().Fields().ByJSONName(key).Message(), value, snakeifyKeys)
if err != nil {
return err
}
return nil
}
func handleProtoPayload(_ pcommon.Map, logRecord plog.LogRecord, _ pcommon.Map, _ string, value stdjson.RawMessage, config Config) error {
switch config.HandleProtoPayloadAs {
case HandleAsJSON:
return setBodyFromJSON(logRecord, value)
case HandleAsProtobuf:
return setBodyFromProto(logRecord, value)
case HandleAsText:
return setBodyFromText(logRecord, string(value))
default:
return errors.New("unrecognized proto payload type")
}
}