pkg/tracegen/otlp.go (235 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package tracegen import ( "context" "crypto/tls" "errors" "fmt" "net" "net/url" "time" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/credentials" grpcinsecure "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" ) // SendOTLPTrace sends spans, error and logs to the configured APM Server // If distributed tracing is needed, you might want to set up the propagator // using SetOTLPTracePropagator function before calling this function func SendOTLPTrace(ctx context.Context, cfg Config) (EventStats, error) { if err := cfg.validate(); err != nil { return EventStats{}, err } endpointURL, err := url.Parse(cfg.apmServerURL) if err != nil { return EventStats{}, fmt.Errorf("failed to parse endpoint: %w", err) } switch endpointURL.Scheme { case "http": if endpointURL.Port() == "" { endpointURL.Host = net.JoinHostPort(endpointURL.Host, "80") } case "https": if endpointURL.Port() == "" { endpointURL.Host = net.JoinHostPort(endpointURL.Host, "443") } default: return EventStats{}, fmt.Errorf("endpoint must be prefixed with http:// or https://") } otlpExporters, err := newOTLPExporters(ctx, endpointURL, cfg) if err != nil { return EventStats{}, err } defer otlpExporters.cleanup(ctx) resource := resource.NewSchemaless( attribute.String("service.name", cfg.otlpServiceName), ) tracerProvider := sdktrace.NewTracerProvider( sdktrace.WithSyncer(otlpExporters.trace), sdktrace.WithResource(resource), ) // generateSpans returns ctx that contains trace context var stats EventStats ctx, err = generateSpans(ctx, tracerProvider.Tracer("tracegen"), &stats) if err != nil { return EventStats{}, err } if err := generateLogs(ctx, otlpExporters.log, resource, &stats); err != nil { return EventStats{}, err } // Shutdown, flushing all data to the server. if err := tracerProvider.Shutdown(ctx); err != nil { return EventStats{}, err } if err := otlpExporters.cleanup(ctx); err != nil { return EventStats{}, err } return stats, nil } func generateSpans(ctx context.Context, tracer trace.Tracer, stats *EventStats) (context.Context, error) { now := time.Now() ctx, parent := tracer.Start(ctx, "parent", trace.WithSpanKind(trace.SpanKindServer), trace.WithTimestamp(now), ) defer parent.End(trace.WithTimestamp(now.Add(time.Millisecond * 1500))) stats.SpansSent++ _, child1 := tracer.Start(ctx, "child1", trace.WithTimestamp(now.Add(time.Millisecond*500))) time.Sleep(10 * time.Millisecond) child1.AddEvent("an arbitrary event") child1.End(trace.WithTimestamp(now.Add(time.Second * 1))) stats.SpansSent++ stats.LogsSent++ // span event is captured as a log _, child2 := tracer.Start(ctx, "child2", trace.WithTimestamp(now.Add(time.Millisecond*600))) time.Sleep(10 * time.Millisecond) child2.RecordError(errors.New("an exception occurred")) child2.End(trace.WithTimestamp(now.Add(time.Millisecond * 1300))) stats.SpansSent++ stats.ExceptionsSent++ // error captured as an error/exception log event return ctx, nil } func generateLogs(ctx context.Context, logger otlplogExporter, res *resource.Resource, stats *EventStats) error { logs := plog.NewLogs() rl := logs.ResourceLogs().AppendEmpty() attribs := rl.Resource().Attributes() for iter := res.Iter(); iter.Next(); { kv := iter.Attribute() switch typ := kv.Value.Type(); typ { case attribute.STRING: attribs.PutStr(string(kv.Key), kv.Value.AsString()) default: panic(fmt.Errorf("unhandled attribute type %q", typ)) } } sl := rl.ScopeLogs().AppendEmpty().LogRecords() record := sl.AppendEmpty() record.Body().SetStr("sample body value") record.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) record.SetSeverityNumber(plog.SeverityNumberFatal) record.SetSeverityText("fatal") stats.LogsSent++ return logger.Export(ctx, logs) } type otlpExporters struct { cleanup func(context.Context) error trace *otlptrace.Exporter log otlplogExporter } func newOTLPExporters(ctx context.Context, endpointURL *url.URL, cfg Config) (*otlpExporters, error) { switch cfg.otlpProtocol { case "grpc": return newOTLPGRPCExporters(ctx, endpointURL, cfg) case "http/protobuf": return newOTLPHTTPExporters(ctx, endpointURL, cfg) default: return nil, fmt.Errorf("invalid protocol %q", cfg.otlpProtocol) } } func newOTLPGRPCExporters(ctx context.Context, endpointURL *url.URL, cfg Config) (*otlpExporters, error) { var transportCredentials credentials.TransportCredentials switch endpointURL.Scheme { case "http": // If http:// is specified, then use insecure (plaintext). transportCredentials = grpcinsecure.NewCredentials() case "https": transportCredentials = credentials.NewTLS(&tls.Config{InsecureSkipVerify: cfg.insecure}) } grpcConn, err := grpc.NewClient( endpointURL.Host, grpc.WithTransportCredentials(transportCredentials), grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")), ) if err != nil { return nil, err } cleanup := func(context.Context) error { return grpcConn.Close() } traceOptions := []otlptracegrpc.Option{otlptracegrpc.WithGRPCConn(grpcConn)} var logHeaders map[string]string headers := map[string]string{"Authorization": "ApiKey " + cfg.apiKey} traceOptions = append(traceOptions, otlptracegrpc.WithHeaders(headers)) logHeaders = headers otlpTraceExporter, err := otlptracegrpc.New(ctx, traceOptions...) if err != nil { cleanup(ctx) return nil, err } cleanup = combineCleanup(otlpTraceExporter.Shutdown, cleanup) return &otlpExporters{ cleanup: cleanup, trace: otlpTraceExporter, log: &otlploggrpcExporter{ client: plogotlp.NewGRPCClient(grpcConn), headers: logHeaders, }, }, nil } func newOTLPHTTPExporters(ctx context.Context, endpointURL *url.URL, cfg Config) (*otlpExporters, error) { tlsConfig := &tls.Config{InsecureSkipVerify: cfg.insecure} traceOptions := []otlptracehttp.Option{ otlptracehttp.WithEndpoint(endpointURL.Host), otlptracehttp.WithTLSClientConfig(tlsConfig), } if endpointURL.Scheme == "http" { traceOptions = append(traceOptions, otlptracehttp.WithInsecure()) } headers := map[string]string{"Authorization": "ApiKey " + cfg.apiKey} traceOptions = append(traceOptions, otlptracehttp.WithHeaders(headers)) cleanup := func(context.Context) error { return nil } otlpTraceExporter, err := otlptracehttp.New(ctx, traceOptions...) if err != nil { cleanup(ctx) return nil, err } cleanup = combineCleanup(otlpTraceExporter.Shutdown, cleanup) return &otlpExporters{ cleanup: cleanup, trace: otlpTraceExporter, log: &otlploghttpExporter{}, }, nil } func combineCleanup(a, b func(context.Context) error) func(context.Context) error { return func(ctx context.Context) error { if err := a(ctx); err != nil { return err } return b(ctx) } } type otlplogExporter interface { Export(ctx context.Context, logs plog.Logs) error } // otlploggrpcExporter is a simple synchronous log exporter using GRPC type otlploggrpcExporter struct { client plogotlp.GRPCClient headers map[string]string } func (e *otlploggrpcExporter) Export(ctx context.Context, logs plog.Logs) error { req := plogotlp.NewExportRequestFromLogs(logs) md := metadata.New(e.headers) ctx = metadata.NewOutgoingContext(ctx, md) _, err := e.client.Export(ctx, req) if err != nil { return err } // TODO: parse response for error return nil } // otlploghttpExporter is a simple synchronous log exporter using protobuf over HTTP type otlploghttpExporter struct { } func (e *otlploghttpExporter) Export(ctx context.Context, logs plog.Logs) error { // TODO: implement return errors.New("otlploghttpExporter isn't implemented") } func SetOTLPTracePropagator(ctx context.Context, traceparent string, tracestate string) context.Context { m := propagation.MapCarrier{} m.Set("traceparent", traceparent) m.Set("tracestate", tracestate) tc := propagation.TraceContext{} // Register the TraceContext propagator globally. otel.SetTextMapPropagator(tc) return tc.Extract(ctx, m) }