exporter/opencensusexporter/opencensus.go (199 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package opencensusexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opencensusexporter" import ( "context" "errors" "fmt" commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus" ) // See https://godoc.org/google.golang.org/grpc#ClientConn.NewStream // why we need to keep the cancel func to cancel the stream type tracesClientWithCancel struct { cancel context.CancelFunc tsec agenttracepb.TraceService_ExportClient } // See https://godoc.org/google.golang.org/grpc#ClientConn.NewStream // why we need to keep the cancel func to cancel the stream type metricsClientWithCancel struct { cancel context.CancelFunc msec agentmetricspb.MetricsService_ExportClient } type ocExporter struct { cfg *Config // gRPC clients and connection. traceSvcClient agenttracepb.TraceServiceClient metricsSvcClient agentmetricspb.MetricsServiceClient // In any of the channels we keep always NumWorkers object (sometimes nil), // to make sure we don't open more than NumWorkers RPCs at any moment. tracesClients chan *tracesClientWithCancel metricsClients chan *metricsClientWithCancel grpcClientConn *grpc.ClientConn metadata metadata.MD settings component.TelemetrySettings } func newOcExporter(_ context.Context, cfg *Config, settings component.TelemetrySettings) (*ocExporter, error) { if cfg.Endpoint == "" { return nil, errors.New("OpenCensus exporter cfg requires an Endpoint") } if cfg.NumWorkers <= 0 { return nil, errors.New("OpenCensus exporter cfg requires at least one worker") } oce := &ocExporter{ cfg: cfg, metadata: metadata.New(nil), settings: settings, } for k, v := range cfg.Headers { oce.metadata.Set(k, string(v)) } return oce, nil } // start creates the gRPC client Connection func (oce *ocExporter) start(ctx context.Context, host component.Host) error { clientConn, err := oce.cfg.ToClientConn(ctx, host, oce.settings) if err != nil { return err } oce.grpcClientConn = clientConn if oce.tracesClients != nil { oce.traceSvcClient = agenttracepb.NewTraceServiceClient(oce.grpcClientConn) // Try to create rpc clients now. for i := 0; i < oce.cfg.NumWorkers; i++ { // Populate the channel with NumWorkers nil RPCs to keep the number of workers // constant in the channel. oce.tracesClients <- nil } } if oce.metricsClients != nil { oce.metricsSvcClient = agentmetricspb.NewMetricsServiceClient(oce.grpcClientConn) // Try to create rpc clients now. for i := 0; i < oce.cfg.NumWorkers; i++ { // Populate the channel with NumWorkers nil RPCs to keep the number of workers // constant in the channel. oce.metricsClients <- nil } } return nil } func (oce *ocExporter) shutdown(context.Context) error { if oce.grpcClientConn == nil { return nil } if oce.tracesClients != nil { // First remove all the clients from the channel. for i := 0; i < oce.cfg.NumWorkers; i++ { <-oce.tracesClients } // Now close the channel close(oce.tracesClients) } if oce.metricsClients != nil { // First remove all the clients from the channel. for i := 0; i < oce.cfg.NumWorkers; i++ { <-oce.metricsClients } // Now close the channel close(oce.metricsClients) } return oce.grpcClientConn.Close() } func newTracesExporter(ctx context.Context, cfg *Config, settings component.TelemetrySettings) (*ocExporter, error) { oce, err := newOcExporter(ctx, cfg, settings) if err != nil { return nil, err } oce.tracesClients = make(chan *tracesClientWithCancel, oce.cfg.NumWorkers) return oce, nil } func newMetricsExporter(ctx context.Context, cfg *Config, settings component.TelemetrySettings) (*ocExporter, error) { oce, err := newOcExporter(ctx, cfg, settings) if err != nil { return nil, err } oce.metricsClients = make(chan *metricsClientWithCancel, oce.cfg.NumWorkers) return oce, nil } func (oce *ocExporter) pushTraces(_ context.Context, td ptrace.Traces) error { // Get first available trace Client. tClient, ok := <-oce.tracesClients if !ok { err := errors.New("failed to push traces, OpenCensus exporter was already stopped") return err } // In any of the metricsClients channel we keep always NumWorkers object (sometimes nil), // to make sure we don't open more than NumWorkers RPCs at any moment. // Here check if the client is nil and create a new one if that is the case. A nil // object means that an error happened: could not connect, service went down, etc. if tClient == nil { var err error tClient, err = oce.createTraceServiceRPC() if err != nil { // Cannot create an RPC, put back nil to keep the number of workers constant. oce.tracesClients <- nil return err } } rss := td.ResourceSpans() for i := 0; i < rss.Len(); i++ { node, resource, spans := opencensus.ResourceSpansToOC(rss.At(i)) // This is a hack because OC protocol expects a Node for the initial message. if node == nil { node = &commonpb.Node{} } if resource == nil { resource = &resourcepb.Resource{} } req := &agenttracepb.ExportTraceServiceRequest{ Spans: spans, Resource: resource, Node: node, } if err := tClient.tsec.Send(req); err != nil { // Error received, cancel the context used to create the RPC to free all resources, // put back nil to keep the number of workers constant. tClient.cancel() oce.tracesClients <- nil return err } } oce.tracesClients <- tClient return nil } func (oce *ocExporter) pushMetrics(_ context.Context, md pmetric.Metrics) error { // Get first available mClient. mClient, ok := <-oce.metricsClients if !ok { err := errors.New("failed to push metrics, OpenCensus exporter was already stopped") return err } // In any of the metricsClients channel we keep always NumWorkers object (sometimes nil), // to make sure we don't open more than NumWorkers RPCs at any moment. // Here check if the client is nil and create a new one if that is the case. A nil // object means that an error happened: could not connect, service went down, etc. if mClient == nil { var err error mClient, err = oce.createMetricsServiceRPC() if err != nil { // Cannot create an RPC, put back nil to keep the number of workers constant. oce.metricsClients <- nil return err } } rms := md.ResourceMetrics() for i := 0; i < rms.Len(); i++ { ocReq := agentmetricspb.ExportMetricsServiceRequest{} ocReq.Node, ocReq.Resource, ocReq.Metrics = opencensus.ResourceMetricsToOC(rms.At(i)) // This is a hack because OC protocol expects a Node for the initial message. if ocReq.Node == nil { ocReq.Node = &commonpb.Node{} } if ocReq.Resource == nil { ocReq.Resource = &resourcepb.Resource{} } if err := mClient.msec.Send(&ocReq); err != nil { // Error received, cancel the context used to create the RPC to free all resources, // put back nil to keep the number of workers constant. mClient.cancel() oce.metricsClients <- nil return err } } oce.metricsClients <- mClient return nil } func (oce *ocExporter) createTraceServiceRPC() (*tracesClientWithCancel, error) { // Initiate the trace service by sending over node identifier info. ctx, cancel := context.WithCancel(context.Background()) if len(oce.cfg.Headers) > 0 { ctx = metadata.NewOutgoingContext(ctx, oce.metadata.Copy()) } // Cannot use grpc.WaitForReady(cfg.WaitForReady) because will block forever. traceClient, err := oce.traceSvcClient.Export(ctx) if err != nil { cancel() return nil, fmt.Errorf("TraceServiceClient: %w", err) } return &tracesClientWithCancel{cancel: cancel, tsec: traceClient}, nil } func (oce *ocExporter) createMetricsServiceRPC() (*metricsClientWithCancel, error) { // Initiate the trace service by sending over node identifier info. ctx, cancel := context.WithCancel(context.Background()) if len(oce.cfg.Headers) > 0 { ctx = metadata.NewOutgoingContext(ctx, oce.metadata.Copy()) } // Cannot use grpc.WaitForReady(cfg.WaitForReady) because will block forever. metricsClient, err := oce.metricsSvcClient.Export(ctx) if err != nil { cancel() return nil, fmt.Errorf("MetricsServiceClient: %w", err) } return &metricsClientWithCancel{cancel: cancel, msec: metricsClient}, nil }