receiver/zipkinreceiver/trace_receiver.go (203 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package zipkinreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver"
import (
"compress/gzip"
"compress/zlib"
"context"
"errors"
"io"
"net"
"net/http"
"strings"
"sync"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv1"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2"
)
const (
receiverTransportV1Thrift = "http_v1_thrift"
receiverTransportV1JSON = "http_v1_json"
receiverTransportV2JSON = "http_v2_json"
receiverTransportV2PROTO = "http_v2_proto"
)
var (
errNextConsumerRespBody = []byte(`"Internal Server Error"`)
errBadRequestRespBody = []byte(`"Bad Request"`)
)
// zipkinReceiver type is used to handle spans received in the Zipkin format.
type zipkinReceiver struct {
nextConsumer consumer.Traces
shutdownWG sync.WaitGroup
server *http.Server
config *Config
v1ThriftUnmarshaler ptrace.Unmarshaler
v1JSONUnmarshaler ptrace.Unmarshaler
jsonUnmarshaler ptrace.Unmarshaler
protobufUnmarshaler ptrace.Unmarshaler
protobufDebugUnmarshaler ptrace.Unmarshaler
settings receiver.Settings
obsrecvrs map[string]*receiverhelper.ObsReport
}
var _ http.Handler = (*zipkinReceiver)(nil)
// newReceiver creates a new zipkinReceiver reference.
func newReceiver(config *Config, nextConsumer consumer.Traces, settings receiver.Settings) (*zipkinReceiver, error) {
transports := []string{receiverTransportV1Thrift, receiverTransportV1JSON, receiverTransportV2JSON, receiverTransportV2PROTO}
obsrecvrs := make(map[string]*receiverhelper.ObsReport)
for _, transport := range transports {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: settings.ID,
Transport: transport,
ReceiverCreateSettings: settings,
})
if err != nil {
return nil, err
}
obsrecvrs[transport] = obsrecv
}
zr := &zipkinReceiver{
nextConsumer: nextConsumer,
config: config,
v1ThriftUnmarshaler: zipkinv1.NewThriftTracesUnmarshaler(),
v1JSONUnmarshaler: zipkinv1.NewJSONTracesUnmarshaler(config.ParseStringTags),
jsonUnmarshaler: zipkinv2.NewJSONTracesUnmarshaler(config.ParseStringTags),
protobufUnmarshaler: zipkinv2.NewProtobufTracesUnmarshaler(false, config.ParseStringTags),
protobufDebugUnmarshaler: zipkinv2.NewProtobufTracesUnmarshaler(true, config.ParseStringTags),
settings: settings,
obsrecvrs: obsrecvrs,
}
return zr, nil
}
// Start spins up the receiver's HTTP server and makes the receiver start its processing.
func (zr *zipkinReceiver) Start(ctx context.Context, host component.Host) error {
if host == nil {
return errors.New("nil host")
}
var err error
zr.server, err = zr.config.ToServer(ctx, host, zr.settings.TelemetrySettings, zr)
if err != nil {
return err
}
var listener net.Listener
listener, err = zr.config.ToListener(ctx)
if err != nil {
return err
}
zr.shutdownWG.Add(1)
go func() {
defer zr.shutdownWG.Done()
if errHTTP := zr.server.Serve(listener); !errors.Is(errHTTP, http.ErrServerClosed) && errHTTP != nil {
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(errHTTP))
}
}()
return nil
}
// v1ToTraceSpans parses Zipkin v1 JSON traces and converts them to OpenCensus Proto spans.
func (zr *zipkinReceiver) v1ToTraceSpans(blob []byte, hdr http.Header) (reqs ptrace.Traces, err error) {
if hdr.Get("Content-Type") == "application/x-thrift" {
return zr.v1ThriftUnmarshaler.UnmarshalTraces(blob)
}
return zr.v1JSONUnmarshaler.UnmarshalTraces(blob)
}
// v2ToTraceSpans parses Zipkin v2 JSON or Protobuf traces and converts them to OpenCensus Proto spans.
func (zr *zipkinReceiver) v2ToTraceSpans(blob []byte, hdr http.Header) (reqs ptrace.Traces, err error) {
// This flag's reference is from:
// https://github.com/openzipkin/zipkin-go/blob/3793c981d4f621c0e3eb1457acffa2c1cc591384/proto/v2/zipkin.proto#L154
debugWasSet := hdr.Get("X-B3-Flags") == "1"
// By default, we'll assume using JSON
unmarshaler := zr.jsonUnmarshaler
// Zipkin can send protobuf via http
if hdr.Get("Content-Type") == "application/x-protobuf" {
// TODO: (@odeke-em) record the unique types of Content-Type uploads
if debugWasSet {
unmarshaler = zr.protobufDebugUnmarshaler
} else {
unmarshaler = zr.protobufUnmarshaler
}
}
return unmarshaler.UnmarshalTraces(blob)
}
// Shutdown tells the receiver that should stop reception,
// giving it a chance to perform any necessary clean-up and shutting down
// its HTTP server.
func (zr *zipkinReceiver) Shutdown(context.Context) error {
var err error
if zr.server != nil {
err = zr.server.Close()
}
zr.shutdownWG.Wait()
return err
}
// processBodyIfNecessary checks the "Content-Encoding" HTTP header and if
// a compression such as "gzip", "deflate", "zlib", is found, the body will
// be uncompressed accordingly or return the body untouched if otherwise.
// Clients such as Zipkin-Java do this behavior e.g.
//
// send "Content-Encoding":"gzip" of the JSON content.
func processBodyIfNecessary(req *http.Request) io.Reader {
switch req.Header.Get("Content-Encoding") {
default:
return req.Body
case "gzip":
return gunzippedBodyIfPossible(req.Body)
case "deflate", "zlib":
return zlibUncompressedbody(req.Body)
}
}
func gunzippedBodyIfPossible(r io.Reader) io.Reader {
gzr, err := gzip.NewReader(r)
if err != nil {
// Just return the old body as was
return r
}
return gzr
}
func zlibUncompressedbody(r io.Reader) io.Reader {
zr, err := zlib.NewReader(r)
if err != nil {
// Just return the old body as was
return r
}
return zr
}
const (
zipkinV1TagValue = "zipkinV1"
zipkinV2TagValue = "zipkinV2"
)
// The zipkinReceiver receives spans from endpoint /api/v2 as JSON,
// unmarshalls them and sends them along to the nextConsumer.
func (zr *zipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// Now deserialize and process the spans.
asZipkinv1 := r.URL != nil && strings.Contains(r.URL.Path, "api/v1/spans")
transportTag := transportType(r, asZipkinv1)
obsrecv := zr.obsrecvrs[transportTag]
ctx = obsrecv.StartTracesOp(ctx)
pr := processBodyIfNecessary(r)
slurp, _ := io.ReadAll(pr)
if c, ok := pr.(io.Closer); ok {
_ = c.Close()
}
_ = r.Body.Close()
var td ptrace.Traces
var err error
if asZipkinv1 {
td, err = zr.v1ToTraceSpans(slurp, r.Header)
} else {
td, err = zr.v2ToTraceSpans(slurp, r.Header)
}
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
numReceivedSpans := td.SpanCount()
consumerErr := zr.nextConsumer.ConsumeTraces(ctx, td)
receiverTagValue := zipkinV2TagValue
if asZipkinv1 {
receiverTagValue = zipkinV1TagValue
}
obsrecv.EndTracesOp(ctx, receiverTagValue, numReceivedSpans, consumerErr)
if consumerErr == nil {
// Send back the response "Accepted" as
// required at https://zipkin.io/zipkin-api/#/default/post_spans
w.WriteHeader(http.StatusAccepted)
return
}
if consumererror.IsPermanent(consumerErr) {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write(errBadRequestRespBody)
} else {
// Transient error, due to some internal condition.
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write(errNextConsumerRespBody)
}
}
func transportType(r *http.Request, asZipkinv1 bool) string {
if asZipkinv1 {
if r.Header.Get("Content-Type") == "application/x-thrift" {
return receiverTransportV1Thrift
}
return receiverTransportV1JSON
}
if r.Header.Get("Content-Type") == "application/x-protobuf" {
return receiverTransportV2PROTO
}
return receiverTransportV2JSON
}