receiver/kafkareceiver/encoding.go (116 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"
import (
"errors"
"fmt"
"strings"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv1"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/unmarshaler"
)
var errUnknownEncodingExtension = errors.New("unknown encoding extension")
func newTracesUnmarshaler(encoding string, _ receiver.Settings, host component.Host) (ptrace.Unmarshaler, error) {
// Extensions take precedence.
if unmarshaler, err := loadEncodingExtension[ptrace.Unmarshaler](host, encoding, "traces"); err != nil {
if !errors.Is(err, errUnknownEncodingExtension) {
return nil, err
}
} else {
return unmarshaler, nil
}
switch encoding {
case "otlp_proto":
return &ptrace.ProtoUnmarshaler{}, nil
case "otlp_json":
return &ptrace.JSONUnmarshaler{}, nil
case "jaeger_proto":
return unmarshaler.JaegerProtoSpanUnmarshaler{}, nil
case "jaeger_json":
return unmarshaler.JaegerJSONSpanUnmarshaler{}, nil
case "zipkin_proto":
return zipkinv2.NewProtobufTracesUnmarshaler(false, false), nil
case "zipkin_json":
return zipkinv2.NewJSONTracesUnmarshaler(false), nil
case "zipkin_thrift":
return zipkinv1.NewThriftTracesUnmarshaler(), nil
}
return nil, fmt.Errorf("unrecognized traces encoding %q", encoding)
}
func newLogsUnmarshaler(encoding string, set receiver.Settings, host component.Host) (plog.Unmarshaler, error) {
// Extensions take precedence.
if unmarshaler, err := loadEncodingExtension[plog.Unmarshaler](host, encoding, "logs"); err != nil {
if !errors.Is(err, errUnknownEncodingExtension) {
return nil, err
}
} else {
return unmarshaler, nil
}
switch encoding {
case "otlp_proto":
return &plog.ProtoUnmarshaler{}, nil
case "otlp_json":
return &plog.JSONUnmarshaler{}, nil
case "raw":
return unmarshaler.RawLogsUnmarshaler{}, nil
case "json":
return unmarshaler.JSONLogsUnmarshaler{}, nil
case "azure_resource_logs":
return &azure.ResourceLogsUnmarshaler{
Version: set.BuildInfo.Version,
Logger: set.Logger,
}, nil
case "text":
return unmarshaler.NewTextLogsUnmarshaler("utf-8")
}
// There is a special case for text-based encodings, where you can specify
// the text encoding (e.g. utf8, utf16) as a suffix in the encoding name.
if textEncodingName, ok := strings.CutPrefix(encoding, "text_"); ok {
u, err := unmarshaler.NewTextLogsUnmarshaler(textEncodingName)
if err != nil {
return nil, fmt.Errorf("invalid text encoding: %w", err)
}
return u, nil
}
return nil, fmt.Errorf("unrecognized logs encoding %q", encoding)
}
func newMetricsUnmarshaler(encoding string, _ receiver.Settings, host component.Host) (pmetric.Unmarshaler, error) {
// Extensions take precedence.
if unmarshaler, err := loadEncodingExtension[pmetric.Unmarshaler](host, encoding, "metrics"); err != nil {
if !errors.Is(err, errUnknownEncodingExtension) {
return nil, err
}
} else {
return unmarshaler, nil
}
switch encoding {
case "otlp_proto":
return &pmetric.ProtoUnmarshaler{}, nil
case "otlp_json":
return &pmetric.JSONUnmarshaler{}, nil
}
return nil, fmt.Errorf("unrecognized metrics encoding %q", encoding)
}
// loadEncodingExtension tries to load an available extension for the given encoding.
func loadEncodingExtension[T any](host component.Host, encoding, signalType string) (T, error) {
var zero T
extensionID, err := encodingToComponentID(encoding)
if err != nil {
return zero, err
}
encodingExtension, ok := host.GetExtensions()[*extensionID]
if !ok {
return zero, fmt.Errorf("invalid encoding %q: %w", encoding, errUnknownEncodingExtension)
}
unmarshaler, ok := encodingExtension.(T)
if !ok {
return zero, fmt.Errorf("extension %q is not a %s unmarshaler", encoding, signalType)
}
return unmarshaler, nil
}
// encodingToComponentID converts an encoding string to a component ID using the given encoding as type.
func encodingToComponentID(encoding string) (*component.ID, error) {
componentType, err := component.NewType(encoding)
if err != nil {
return nil, fmt.Errorf("invalid component type: %w", err)
}
id := component.NewID(componentType)
return &id, nil
}