exporter/collector/traces.go (86 lines of code) (raw):

// Copyright 2021 OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package collector contains the wrapper for OpenTelemetry-GoogleCloud // exporter to be used in opentelemetry-collector. package collector import ( "context" "errors" "fmt" "time" traceapi "cloud.google.com/go/trace/apiv2" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/attribute" sdktrace "go.opentelemetry.io/otel/sdk/trace" texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace" ) // TraceExporter is a wrapper struct of OT cloud trace exporter. type TraceExporter struct { obs selfObservability texporter *texporter.Exporter cfg Config timeout time.Duration } func (te *TraceExporter) Shutdown(ctx context.Context) error { if te.texporter != nil { return te.texporter.Shutdown(ctx) } return nil } func NewGoogleCloudTracesExporter( ctx context.Context, cfg Config, set exporter.Settings, timeout time.Duration, ) (*TraceExporter, error) { SetUserAgent(&cfg, set.BuildInfo) obs := selfObservability{ log: set.TelemetrySettings.Logger, meterProvider: set.TelemetrySettings.MeterProvider, } return &TraceExporter{cfg: cfg, timeout: timeout, obs: obs}, nil } func (te *TraceExporter) Start(ctx context.Context, _ component.Host) error { topts := []texporter.Option{ texporter.WithProjectID(te.cfg.ProjectID), texporter.WithTimeout(te.timeout), } if te.cfg.DestinationProjectQuota { topts = append(topts, texporter.WithDestinationProjectQuota()) } if te.cfg.TraceConfig.AttributeMappings != nil { topts = append(topts, texporter.WithAttributeMapping(mappingFuncFromAKM(te.cfg.TraceConfig.AttributeMappings))) } copts, err := generateClientOptions(ctx, &te.cfg.TraceConfig.ClientConfig, &te.cfg, traceapi.DefaultAuthScopes(), te.obs.meterProvider) if err != nil { return err } topts = append(topts, texporter.WithTraceClientOptions(copts)) exp, err := texporter.New(topts...) if err != nil { return fmt.Errorf("error creating GoogleCloud Trace exporter: %w", err) } te.texporter = exp return nil } func mappingFuncFromAKM(akm []AttributeMapping) func(attribute.Key) attribute.Key { // convert list to map for easy lookups mapFromConfig := make(map[string]string, len(akm)) for _, mapping := range akm { mapFromConfig[mapping.Key] = mapping.Replacement } return func(input attribute.Key) attribute.Key { // if a replacement was specified in the config, use it. if replacement, ok := mapFromConfig[string(input)]; ok { return attribute.Key(replacement) } // otherwise, leave the attribute as-is return input } } // PushTraces calls texporter.ExportSpan for each span in the given traces. func (te *TraceExporter) PushTraces(ctx context.Context, td ptrace.Traces) error { if te.texporter == nil { return errors.New("not started") } resourceSpans := td.ResourceSpans() spans := make([]sdktrace.ReadOnlySpan, 0, td.SpanCount()) for i := 0; i < resourceSpans.Len(); i++ { sd := pdataResourceSpansToOTSpanData(resourceSpans.At(i)) spans = append(spans, sd...) } return te.texporter.ExportSpans(ctx, spans) }