exporter/sumologicexporter/exporter.go (384 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter" import ( "context" "errors" "fmt" "net/http" "net/url" "path" "strings" "sync" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pipeline" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/sumologicextension" ) const ( logsDataURL = "/api/v1/collector/logs" metricsDataURL = "/api/v1/collector/metrics" tracesDataURL = "/api/v1/collector/traces" ) type sumologicexporter struct { config *Config host component.Host logger *zap.Logger clientLock sync.RWMutex client *http.Client prometheusFormatter prometheusFormatter // Lock around data URLs is needed because the reconfiguration of the exporter // can happen asynchronously whenever the exporter is re registering. dataURLsLock sync.RWMutex dataURLMetrics string dataURLLogs string dataURLTraces string foundSumologicExtension bool sumologicExtension *sumologicextension.SumologicExtension stickySessionCookieLock sync.RWMutex stickySessionCookie string id component.ID sender *sender telemetryBuilder *metadata.TelemetryBuilder } func initExporter(cfg *Config, set exporter.Settings) (*sumologicexporter, error) { telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) if err != nil { return nil, err } se := &sumologicexporter{ config: cfg, logger: set.Logger, // NOTE: client is now set in start() prometheusFormatter: newPrometheusFormatter(), id: set.ID, foundSumologicExtension: false, telemetryBuilder: telemetryBuilder, } se.logger.Info( "Sumo Logic Exporter configured", zap.String("log_format", string(cfg.LogFormat)), zap.String("metric_format", string(cfg.MetricFormat)), ) return se, nil } func newLogsExporter( ctx context.Context, params exporter.Settings, cfg *Config, ) (exporter.Logs, error) { se, err := initExporter(cfg, params) if err != nil { return nil, err } return exporterhelper.NewLogs( ctx, params, cfg, se.pushLogsData, // Disable exporterhelper Timeout, since we are using a custom mechanism // within exporter itself exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}), exporterhelper.WithRetry(cfg.BackOffConfig), exporterhelper.WithQueue(cfg.QueueSettings), exporterhelper.WithStart(se.start), exporterhelper.WithShutdown(se.shutdown), ) } func newMetricsExporter( ctx context.Context, params exporter.Settings, cfg *Config, ) (exporter.Metrics, error) { se, err := initExporter(cfg, params) if err != nil { return nil, err } return exporterhelper.NewMetrics( ctx, params, cfg, se.pushMetricsData, // Disable exporterhelper Timeout, since we are using a custom mechanism // within exporter itself exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}), exporterhelper.WithRetry(cfg.BackOffConfig), exporterhelper.WithQueue(cfg.QueueSettings), exporterhelper.WithStart(se.start), exporterhelper.WithShutdown(se.shutdown), ) } func newTracesExporter( ctx context.Context, params exporter.Settings, cfg *Config, ) (exporter.Traces, error) { se, err := initExporter(cfg, params) if err != nil { return nil, err } return exporterhelper.NewTraces( ctx, params, cfg, se.pushTracesData, // Disable exporterhelper Timeout, since we are using a custom mechanism // within exporter itself exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}), exporterhelper.WithRetry(cfg.BackOffConfig), exporterhelper.WithQueue(cfg.QueueSettings), exporterhelper.WithStart(se.start), exporterhelper.WithShutdown(se.shutdown), ) } // start starts the exporter func (se *sumologicexporter) start(ctx context.Context, host component.Host) (err error) { se.host = host return se.configure(ctx) } func (se *sumologicexporter) configure(ctx context.Context) error { var ( ext *sumologicextension.SumologicExtension foundSumoExt bool ) httpSettings := se.config.ClientConfig for _, e := range se.host.GetExtensions() { v, ok := e.(*sumologicextension.SumologicExtension) if ok && httpSettings.Auth.AuthenticatorID == v.ComponentID() { ext = v foundSumoExt = true se.foundSumologicExtension = true se.sumologicExtension = ext break } } switch { case httpSettings.Endpoint == "" && httpSettings.Auth != nil && httpSettings.Auth.AuthenticatorID.Type() == sumologicextension.NewFactory().Type(): // If user specified using sumologicextension as auth but none was // found then return an error. if !foundSumoExt { return fmt.Errorf( "sumologic was specified as auth extension (named: %q) but "+ "a matching extension was not found in the config, "+ "please re-check the config and/or define the sumologicextension", httpSettings.Auth.AuthenticatorID.String(), ) } // If we're using sumologicextension as authentication extension and // endpoint was not set then send data on a collector generic ingest URL // with authentication set by sumologicextension. u, err := url.Parse(ext.BaseURL()) if err != nil { return fmt.Errorf("failed to parse API base URL from sumologicextension: %w", err) } logsURL := *u logsURL.Path = logsDataURL metricsURL := *u metricsURL.Path = metricsDataURL tracesURL := *u tracesURL.Path = tracesDataURL se.setDataURLs(logsURL.String(), metricsURL.String(), tracesURL.String()) case httpSettings.Endpoint != "": logsURL, err := getSignalURL(se.config, httpSettings.Endpoint, pipeline.SignalLogs) if err != nil { return err } metricsURL, err := getSignalURL(se.config, httpSettings.Endpoint, pipeline.SignalMetrics) if err != nil { return err } tracesURL, err := getSignalURL(se.config, httpSettings.Endpoint, pipeline.SignalTraces) if err != nil { return err } se.setDataURLs(logsURL, metricsURL, tracesURL) // Clean authenticator if set to sumologic. // Setting to null in configuration doesn't work, so we have to force it that way. if httpSettings.Auth != nil && httpSettings.Auth.AuthenticatorID.Type() == sumologicextension.NewFactory().Type() { httpSettings.Auth = nil } default: return fmt.Errorf("no auth extension and no endpoint specified") } client, err := httpSettings.ToClient(ctx, se.host, componenttest.NewNopTelemetrySettings()) if err != nil { return fmt.Errorf("failed to create HTTP Client: %w", err) } se.setHTTPClient(client) logsURL, metricsURL, tracesURL := se.getDataURLs() se.sender = newSender( se.logger, se.config, se.getHTTPClient(), se.prometheusFormatter, metricsURL, logsURL, tracesURL, se.StickySessionCookie, se.SetStickySessionCookie, se.id, se.telemetryBuilder, ) return nil } func (se *sumologicexporter) setHTTPClient(client *http.Client) { se.clientLock.Lock() se.client = client se.clientLock.Unlock() } func (se *sumologicexporter) getHTTPClient() *http.Client { se.clientLock.RLock() defer se.clientLock.RUnlock() return se.client } func (se *sumologicexporter) setDataURLs(logs, metrics, traces string) { se.dataURLsLock.Lock() se.logger.Info("setting data urls", zap.String("logs_url", sanitizeURL(logs)), zap.String("metrics_url", sanitizeURL(metrics)), zap.String("traces_url", sanitizeURL(traces))) se.dataURLLogs, se.dataURLMetrics, se.dataURLTraces = logs, metrics, traces se.dataURLsLock.Unlock() } func (se *sumologicexporter) getDataURLs() (logs, metrics, traces string) { se.dataURLsLock.RLock() defer se.dataURLsLock.RUnlock() return se.dataURLLogs, se.dataURLMetrics, se.dataURLTraces } func (se *sumologicexporter) shutdown(context.Context) error { return nil } // pushLogsData groups data with common metadata and sends them as separate batched requests. // It returns the number of unsent logs and an error which contains a list of dropped records // so they can be handled by OTC retry mechanism func (se *sumologicexporter) pushLogsData(ctx context.Context, ld plog.Logs) error { // Follow different execution path for OTLP format if se.sender.config.LogFormat == OTLPLogFormat { if err := se.sender.sendOTLPLogs(ctx, ld); err != nil { se.handleUnauthorizedErrors(ctx, err) return consumererror.NewLogs(err, ld) } return nil } type droppedResourceRecords struct { resource pcommon.Resource records []plog.LogRecord } var ( errs []error dropped []droppedResourceRecords ) // Iterate over ResourceLogs rls := ld.ResourceLogs() for i := 0; i < rls.Len(); i++ { rl := rls.At(i) currentMetadata := newFields(rl.Resource().Attributes()) if droppedRecords, err := se.sender.sendNonOTLPLogs(ctx, rl, currentMetadata); err != nil { dropped = append(dropped, droppedResourceRecords{ resource: rl.Resource(), records: droppedRecords, }) errs = append(errs, err) } } if len(dropped) > 0 { ld = plog.NewLogs() // Copy all dropped records to Logs // NOTE: we only copy resource and log records here. // Scope is not handled properly but it never was. for i := range dropped { rls := ld.ResourceLogs().AppendEmpty() dropped[i].resource.CopyTo(rls.Resource()) for j := 0; j < len(dropped[i].records); j++ { dropped[i].records[j].CopyTo( rls.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty(), ) } } errs = deduplicateErrors(errs) se.handleUnauthorizedErrors(ctx, errs...) return consumererror.NewLogs(errors.Join(errs...), ld) } return nil } // pushMetricsData groups data with common metadata and send them as separate batched requests // it returns number of unsent metrics and error which contains list of dropped records // so they can be handle by the OTC retry mechanism func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pmetric.Metrics) error { var droppedMetrics pmetric.Metrics var errs []error if se.sender.config.MetricFormat == OTLPMetricFormat { if err := se.sender.sendOTLPMetrics(ctx, md); err != nil { droppedMetrics = md errs = []error{err} } } else { droppedMetrics, errs = se.sender.sendNonOTLPMetrics(ctx, md) } if len(errs) > 0 { se.handleUnauthorizedErrors(ctx, errs...) return consumererror.NewMetrics(errors.Join(errs...), droppedMetrics) } return nil } // handleUnauthorizedErrors checks if any of the provided errors is an unauthorized error. // In which case it triggers exporter reconfiguration which in turn takes the credentials // from sumologicextension which at this point should already detect the problem with // authorization (via heartbeats) and prepare new collector credentials to be available. func (se *sumologicexporter) handleUnauthorizedErrors(ctx context.Context, errs ...error) { for _, err := range errs { if errors.Is(err, errUnauthorized) { se.logger.Warn("Received unauthorized status code, triggering reconfiguration") errC := se.configure(ctx) if errC == nil { // It's enough to successfully reconfigure the exporter just once. return } se.logger.Error("Error configuring the exporter with new credentials", zap.Error(err)) } } } func (se *sumologicexporter) pushTracesData(ctx context.Context, td ptrace.Traces) error { err := se.sender.sendTraces(ctx, td) se.handleUnauthorizedErrors(ctx, err) return err } func (se *sumologicexporter) StickySessionCookie() string { if se.foundSumologicExtension { return se.sumologicExtension.StickySessionCookie() } se.stickySessionCookieLock.RLock() defer se.stickySessionCookieLock.RUnlock() return se.stickySessionCookie } func (se *sumologicexporter) SetStickySessionCookie(stickySessionCookie string) { if se.foundSumologicExtension { se.sumologicExtension.SetStickySessionCookie(stickySessionCookie) return } se.stickySessionCookieLock.Lock() se.stickySessionCookie = stickySessionCookie se.stickySessionCookieLock.Unlock() } // get the destination url for a given signal type // this mostly adds signal-specific suffixes if the format is otlp func getSignalURL(oCfg *Config, endpointURL string, signal pipeline.Signal) (string, error) { url, err := url.Parse(endpointURL) if err != nil { return "", err } switch signal { case pipeline.SignalLogs: if oCfg.LogFormat != "otlp" { return url.String(), nil } case pipeline.SignalMetrics: if oCfg.MetricFormat != "otlp" { return url.String(), nil } case pipeline.SignalTraces: default: return "", fmt.Errorf("unknown signal type: %s", signal) } signalURLSuffix := fmt.Sprintf("/v1/%s", signal) if !strings.HasSuffix(url.Path, signalURLSuffix) { url.Path = path.Join(url.Path, signalURLSuffix) } return url.String(), nil } func sanitizeURL(urlString string) string { strBefore := "otlp/" strAfter := "/v1/" leftIndex := strings.Index(urlString, strBefore) rightIndex := strings.LastIndex(urlString, strAfter) if leftIndex == -1 || rightIndex == -1 { return urlString } length := len(strBefore) checkSensitiveStrLen := (rightIndex - leftIndex) - length if checkSensitiveStrLen > 0 { s1 := urlString[0 : leftIndex+len(strBefore)] s2 := nchars('*', (rightIndex - leftIndex - length)) s3 := urlString[rightIndex:] sanitizedStr := strings.Join([]string{s1, s2, s3}, "") return sanitizedStr } return urlString } func nchars(b byte, n int) string { s := make([]byte, n) for i := range n { s[i] = b } return string(s) }