extension/encoding/googlecloudlogentryencodingextension/log_proto_payload.go (277 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package googlecloudlogentryencodingextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/googlecloudlogentryencodingextension"
import (
"bytes"
stdjson "encoding/json"
"errors"
"fmt"
"strconv"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"
"google.golang.org/protobuf/types/known/anypb"
)
func setBodyFromProto(logRecord plog.LogRecord, value stdjson.RawMessage) error {
err := translateInto(logRecord.Body().SetEmptyMap(), (&anypb.Any{}).ProtoReflect().Descriptor(), value)
return err
}
func (opts fieldTranslateOptions) translateValue(dst pcommon.Value, fd protoreflect.FieldDescriptor, src stdjson.RawMessage) error {
var err error
switch fd.Kind() {
case protoreflect.MessageKind:
msg := fd.Message()
switch fd.Message().FullName() {
case "google.protobuf.Duration", "google.protobuf.Timestamp":
// protojson represents both of these as strings
return translateStr(dst, src)
case "google.protobuf.Struct", "google.protobuf.Value":
return translateRaw(dst, src)
case
"google.protobuf.BoolValue",
"google.protobuf.BytesValue",
"google.protobuf.DoubleValue",
"google.protobuf.FloatValue",
"google.protobuf.Int32Value",
"google.protobuf.Int64Value",
"google.protobuf.StringValue",
"google.protobuf.UInt32Value",
"google.protobuf.UInt64Value":
// All the wrapper types have a single field with name
// `value` and field number 1, and are represented in
// protojson without the wrapping.
innerFd := fd.Message().Fields().ByNumber(1)
_ = opts.translateValue(dst, innerFd, src)
default:
var m pcommon.Map
switch dst.Type() {
case pcommon.ValueTypeMap:
m = dst.Map()
default:
m = dst.SetEmptyMap()
}
return translateInto(m, msg, src)
}
case protoreflect.EnumKind:
// protojson accepts either string name or enum int value; try both.
if translateStr(dst, src) == nil {
return nil
}
enum := fd.Enum()
var i int32
if err = json.Unmarshal(src, &i); err != nil {
return fmt.Errorf("wrong type for enum: %v", getTokenType(src))
}
enumValue := enum.Values().ByNumber(protoreflect.EnumNumber(i))
if enumValue == nil {
return fmt.Errorf("%v has no enum value for %v", enum.FullName(), i)
}
dst.SetStr(string(enumValue.Name()))
case protoreflect.BoolKind:
var val bool
err = json.Unmarshal(src, &val)
if err != nil {
return err
}
dst.SetBool(val)
case protoreflect.Int32Kind,
protoreflect.Uint32Kind,
protoreflect.Sfixed32Kind,
protoreflect.Fixed32Kind,
protoreflect.Sint32Kind,
protoreflect.Int64Kind,
protoreflect.Uint64Kind,
protoreflect.Sfixed64Kind,
protoreflect.Fixed64Kind,
protoreflect.Sint64Kind:
// The protojson encoding accepts either string or number for
// integer types, so try both.
var val int64
if json.Unmarshal(src, &val) == nil {
dst.SetInt(val)
return nil
}
var s string
if err = json.Unmarshal(src, &s); err != nil {
return err
}
if val, err = strconv.ParseInt(s, 10, 64); err != nil {
return err
}
dst.SetInt(val)
return nil
case protoreflect.FloatKind, protoreflect.DoubleKind:
var val float64
err := json.Unmarshal(src, &val)
if err != nil {
return err
}
dst.SetDouble(val)
return nil
case protoreflect.BytesKind:
var val []byte
err := json.Unmarshal(src, &val)
if err != nil {
return err
}
dst.SetEmptyBytes().Append(val...)
return nil
case protoreflect.StringKind:
return translateStr(dst, src)
default:
return errors.New("unknown field kind")
}
return nil
}
func (opts fieldTranslateOptions) translateList(dst pcommon.Slice, fd protoreflect.FieldDescriptor, src stdjson.RawMessage) error {
var msg []stdjson.RawMessage
if err := json.Unmarshal(src, &msg); err != nil {
return err
}
for _, v := range msg {
err := opts.translateValue(dst.AppendEmpty(), fd, v)
if err != nil {
return err
}
}
return nil
}
func (opts fieldTranslateOptions) translateMap(dst pcommon.Map, fd protoreflect.FieldDescriptor, src stdjson.RawMessage) error {
var msg map[string]stdjson.RawMessage
if err := json.Unmarshal(src, &msg); err != nil {
return err
}
for k, v := range msg {
err := opts.translateValue(dst.PutEmpty(k), fd.MapValue(), v)
if err != nil {
return err
}
}
return nil
}
func translateAny(dst pcommon.Map, src map[string]stdjson.RawMessage) error {
// protojson represents Any as the JSON representation of the actual
// message, plus a special @type field containing the type URL of the
// message.
typeURL, ok := src["@type"]
if !ok {
return errors.New("no @type member in Any message")
}
var typeURLStr string
if err := json.Unmarshal(typeURL, &typeURLStr); err != nil {
return err
}
delete(src, "@type")
msgType, err := protoregistry.GlobalTypes.FindMessageByURL(typeURLStr)
if errors.Is(err, protoregistry.NotFound) {
// If we don't have the type, we do a best-effort JSON decode;
// some ints might be floats or strings.
for k, v := range src {
var val any
err = json.Unmarshal(v, &val)
if err != nil {
return nil
}
_ = dst.PutEmpty(k).FromRaw(val)
}
return nil
}
err = translateInto(dst, msgType.Descriptor(), src)
if err != nil {
return err
}
dst.PutStr("@type", typeURLStr)
return nil
}
func translateProtoMessage(dst pcommon.Map, desc protoreflect.MessageDescriptor, src map[string]stdjson.RawMessage, opts fieldTranslateOptions) error {
if !opts.preserveDst {
dst.Clear()
}
// Handle well-known aggregate types.
switch desc.FullName() {
case "google.protobuf.Any":
return translateAny(dst, src)
case "google.protobuf.Struct":
for k, v := range src {
var val any
if err := json.Unmarshal(v, &val); err != nil {
return err
}
_ = dst.PutEmpty(k).FromRaw(val)
}
return nil
case "google.protobuf.Empty":
dst.Clear()
return nil
default:
}
for k, v := range src {
key := opts.mapKey(k)
fd := desc.Fields().ByJSONName(k)
if fd == nil {
return fmt.Errorf("%v has no known field with JSON name %v", desc.FullName(), k)
}
var err error
switch {
case fd.IsList():
err = opts.translateList(dst.PutEmptySlice(key), fd, v)
case fd.IsMap():
err = opts.translateMap(dst.PutEmptyMap(key), fd, v)
default:
err = opts.translateValue(dst.PutEmpty(key), fd, v)
}
if err != nil {
return err
}
}
return nil
}
func translateInto(dst pcommon.Map, desc protoreflect.MessageDescriptor, src any, opts ...fieldTranslateFn) error {
var toTranslate map[string]stdjson.RawMessage
switch msg := src.(type) {
case stdjson.RawMessage:
err := json.Unmarshal(msg, &toTranslate)
if err != nil {
return err
}
case map[string]stdjson.RawMessage:
toTranslate = msg
}
options := fieldTranslateOptions{}
for _, opt := range opts {
opt(&options)
}
return translateProtoMessage(dst, desc, toTranslate, options)
}
func translateStr(dst pcommon.Value, src stdjson.RawMessage) error {
var val string
err := json.Unmarshal(src, &val)
if err != nil {
return err
}
dst.SetStr(val)
return nil
}
func translateRaw(dst pcommon.Value, src stdjson.RawMessage) error {
var val any
err := json.Unmarshal(src, &val)
if err != nil {
return err
}
_ = dst.FromRaw(val)
return nil
}
func getTokenType(src stdjson.RawMessage) string {
dec := stdjson.NewDecoder(bytes.NewReader(src))
tok, err := dec.Token()
if err != nil {
return "invalid json"
}
switch t := tok.(type) {
case stdjson.Delim:
switch t {
case '[':
return "array"
case '{':
return "object"
default:
return "invalid json"
}
case bool:
return "bool"
case float64, stdjson.Number:
return "number"
case string:
return "string"
case nil:
return "null"
default:
return "unknown"
}
}