receiver/opencensusreceiver/opencensus.go (169 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package opencensusreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver"
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"sync"
"time"
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
gatewayruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/rs/cors"
"github.com/soheilhy/cmux"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver/internal/ocmetrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver/internal/octrace"
)
// ocReceiver is the type that exposes Trace and Metrics reception.
type ocReceiver struct {
cfg *Config
ln net.Listener
serverGRPC *grpc.Server
serverHTTP *http.Server
gatewayMux *gatewayruntime.ServeMux
corsOrigins []string
grpcServerSettings configgrpc.ServerConfig
cancel context.CancelFunc
traceReceiver *octrace.Receiver
metricsReceiver *ocmetrics.Receiver
traceConsumer consumer.Traces
metricsConsumer consumer.Metrics
stopWG sync.WaitGroup
settings receiver.Settings
multiplexer cmux.CMux
}
// newOpenCensusReceiver just creates the OpenCensus receiver services. It is the caller's
// responsibility to invoke the respective Start*Reception methods as well
// as the various Stop*Reception methods to end it.
func newOpenCensusReceiver(
cfg *Config,
tc consumer.Traces,
mc consumer.Metrics,
settings receiver.Settings,
opts ...ocOption,
) *ocReceiver {
ocr := &ocReceiver{
cfg: cfg,
corsOrigins: []string{}, // Disable CORS by default.
gatewayMux: gatewayruntime.NewServeMux(),
traceConsumer: tc,
metricsConsumer: mc,
settings: settings,
}
for _, opt := range opts {
opt.withReceiver(ocr)
}
return ocr
}
// Start runs the trace receiver on the gRPC server. Currently
// it also enables the metrics receiver too.
func (ocr *ocReceiver) Start(ctx context.Context, host component.Host) error {
var err error
ocr.serverGRPC, err = ocr.grpcServerSettings.ToServer(ctx, host, ocr.settings.TelemetrySettings)
if err != nil {
return err
}
var mux http.Handler = ocr.gatewayMux
if len(ocr.corsOrigins) > 0 {
co := cors.Options{AllowedOrigins: ocr.corsOrigins}
mux = cors.New(co).Handler(mux)
}
ocr.serverHTTP = &http.Server{Handler: mux, ReadHeaderTimeout: 20 * time.Second}
hasConsumer := false
if ocr.traceConsumer != nil {
hasConsumer = true
ocr.traceReceiver, err = octrace.New(ocr.traceConsumer, ocr.settings)
if err != nil {
return err
}
agenttracepb.RegisterTraceServiceServer(ocr.serverGRPC, ocr.traceReceiver)
}
if ocr.metricsConsumer != nil {
hasConsumer = true
ocr.metricsReceiver, err = ocmetrics.New(ocr.metricsConsumer, ocr.settings)
if err != nil {
return err
}
agentmetricspb.RegisterMetricsServiceServer(ocr.serverGRPC, ocr.metricsReceiver)
}
if !hasConsumer {
return errors.New("cannot start receiver: no consumers were specified")
}
ocr.ln, err = net.Listen(string(ocr.cfg.NetAddr.Transport), ocr.cfg.NetAddr.Endpoint)
if err != nil {
return fmt.Errorf("failed to bind to address %q: %w", ocr.cfg.NetAddr.Endpoint, err)
}
// Register the grpc-gateway on the HTTP server mux
var c context.Context
c, ocr.cancel = context.WithCancel(context.Background())
endpoint := ocr.ln.Addr().String()
_, ok := ocr.ln.(*net.UnixListener)
if ok {
endpoint = "unix:" + endpoint
}
// Start the gRPC and HTTP/JSON (grpc-gateway) servers on the same port.
ocr.multiplexer = cmux.New(ocr.ln)
grpcL := ocr.multiplexer.MatchWithWriters(
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"),
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"))
httpL := ocr.multiplexer.Match(cmux.Any())
ocr.stopWG.Add(1)
startWG := sync.WaitGroup{}
startWG.Add(3)
go func() {
defer ocr.stopWG.Done()
startWG.Done()
// Check for cmux.ErrServerClosed, because during the shutdown this is not properly close before closing the cmux,
if err := ocr.serverGRPC.Serve(grpcL); err != nil && !errors.Is(err, grpc.ErrServerStopped) && !errors.Is(err, cmux.ErrServerClosed) {
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
}
}()
go func() {
startWG.Done()
if err := ocr.serverHTTP.Serve(httpL); err != nil && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, cmux.ErrServerClosed) {
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
}
}()
go func() {
startWG.Done()
if err := ocr.multiplexer.Serve(); err != nil && !errors.Is(err, net.ErrClosed) {
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
}
}()
startWG.Wait()
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
if err := agenttracepb.RegisterTraceServiceHandlerFromEndpoint(c, ocr.gatewayMux, endpoint, opts); err != nil {
return err
}
if err := agentmetricspb.RegisterMetricsServiceHandlerFromEndpoint(c, ocr.gatewayMux, endpoint, opts); err != nil {
return err
}
if ocr.serverGRPC == nil {
var err error
ocr.serverGRPC, err = ocr.grpcServerSettings.ToServer(context.Background(), host, ocr.settings.TelemetrySettings)
if err != nil {
return err
}
}
// At this point we've successfully started all the services/receivers.
// Add other start routines here.
return nil
}
// Shutdown is a method to turn off receiving.
func (ocr *ocReceiver) Shutdown(context.Context) error {
if ocr.cancel != nil {
ocr.cancel()
}
if ocr.serverGRPC != nil {
ocr.serverGRPC.Stop()
ocr.stopWG.Wait()
}
if ocr.serverHTTP != nil {
_ = ocr.serverHTTP.Close()
}
if ocr.ln != nil {
_ = ocr.ln.Close()
}
if ocr.multiplexer != nil {
ocr.multiplexer.Close()
}
ocr.traceConsumer = nil
ocr.metricsConsumer = nil
return nil
}