exporter/splunkhecexporter/client.go (549 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter" import ( "bytes" "context" "errors" "fmt" "net/http" "net/url" "sync" "github.com/goccy/go-json" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/metric" "go.uber.org/multierr" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk" ) // allow monkey patching for injecting pushLogData function in test var getPushLogFn = func(c *client) func(ctx context.Context, ld plog.Logs) error { return c.pushLogData } // iterState captures a state of iteration over the pdata Logs/Metrics/Traces instances. type iterState struct { resource int // index in ResourceLogs/ResourceMetrics/ResourceSpans list library int // index in ScopeLogs/ScopeMetrics/ScopeSpans list record int // index in Logs/Metrics/Spans list done bool } func (s iterState) empty() bool { return s.resource == 0 && s.library == 0 && s.record == 0 } // client sends the data to the splunk backend. type client struct { config *Config logger *zap.Logger wg sync.WaitGroup telemetrySettings component.TelemetrySettings hecWorker hecWorker buildInfo component.BuildInfo heartbeater *heartbeater bufferPool bufferPool exporterName string meter metric.Meter } func newClient(set exporter.Settings, cfg *Config, maxContentLength uint) *client { return &client{ config: cfg, logger: set.Logger, telemetrySettings: set.TelemetrySettings, buildInfo: set.BuildInfo, bufferPool: newBufferPool(maxContentLength, !cfg.DisableCompression), exporterName: set.ID.String(), meter: metadata.Meter(set.TelemetrySettings), } } func newLogsClient(set exporter.Settings, cfg *Config) *client { return newClient(set, cfg, cfg.MaxContentLengthLogs) } func newTracesClient(set exporter.Settings, cfg *Config) *client { return newClient(set, cfg, cfg.MaxContentLengthTraces) } func newMetricsClient(set exporter.Settings, cfg *Config) *client { return newClient(set, cfg, cfg.MaxContentLengthMetrics) } func (c *client) pushMetricsData( ctx context.Context, md pmetric.Metrics, ) error { c.wg.Add(1) defer c.wg.Done() localHeaders := map[string]string{} if md.ResourceMetrics().Len() != 0 { accessToken, found := md.ResourceMetrics().At(0).Resource().Attributes().Get(splunk.HecTokenLabel) if found { localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.Str() } } if c.config.UseMultiMetricFormat { return c.pushMultiMetricsDataInBatches(ctx, md, localHeaders) } return c.pushMetricsDataInBatches(ctx, md, localHeaders) } func (c *client) pushTraceData( ctx context.Context, td ptrace.Traces, ) error { c.wg.Add(1) defer c.wg.Done() localHeaders := map[string]string{} if td.ResourceSpans().Len() != 0 { accessToken, found := td.ResourceSpans().At(0).Resource().Attributes().Get(splunk.HecTokenLabel) if found { localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.Str() } } return c.pushTracesDataInBatches(ctx, td, localHeaders) } func (c *client) pushLogData(ctx context.Context, ld plog.Logs) error { c.wg.Add(1) defer c.wg.Done() if ld.ResourceLogs().Len() == 0 { return nil } localHeaders := map[string]string{} // All logs in a batch have the same access token after batchperresourceattr, so we can just check the first one. accessToken, found := ld.ResourceLogs().At(0).Resource().Attributes().Get(splunk.HecTokenLabel) if found { localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.Str() } // All logs in a batch have only one type (regular or profiling logs) after perScopeBatcher, // so we can just check the first one. for i := 0; i < ld.ResourceLogs().Len(); i++ { sls := ld.ResourceLogs().At(i).ScopeLogs() if sls.Len() > 0 { if isProfilingData(sls.At(0)) { localHeaders[libraryHeaderName] = profilingLibraryName } break } } return c.pushLogDataInBatches(ctx, ld, localHeaders) } // A guesstimated value > length of bytes of a single event. // Added to buffer capacity so that buffer is likely to grow by reslicing when buf.Len() > bufCap. const ( bufCapPadding = uint(4096) libraryHeaderName = "X-Splunk-Instrumentation-Library" profilingLibraryName = "otel.profiling" ) func isProfilingData(sl plog.ScopeLogs) bool { return sl.Scope().Name() == profilingLibraryName } // pushLogDataInBatches sends batches of Splunk events in JSON format. // The batch content length is restricted to MaxContentLengthLogs. // ld log records are parsed to Splunk events. func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, headers map[string]string) error { buf := c.bufferPool.get() defer c.bufferPool.put(buf) is := iterState{} var permanentErrors []error for !is.done { buf.Reset() latestIterState, batchPermanentErrors := c.fillLogsBuffer(ld, buf, is) permanentErrors = append(permanentErrors, batchPermanentErrors...) if !buf.Empty() { if err := c.postEvents(ctx, buf, headers); err != nil { return consumererror.NewLogs(err, subLogs(ld, is)) } } is = latestIterState } return multierr.Combine(permanentErrors...) } // fillLogsBuffer fills the buffer with Splunk events until the buffer is full or all logs are processed. func (c *client) fillLogsBuffer(logs plog.Logs, buf buffer, is iterState) (iterState, []error) { var b []byte var permanentErrors []error for i := is.resource; i < logs.ResourceLogs().Len(); i++ { rl := logs.ResourceLogs().At(i) for j := is.library; j < rl.ScopeLogs().Len(); j++ { is.library = 0 // Reset library index for next resource. sl := rl.ScopeLogs().At(j) for k := is.record; k < sl.LogRecords().Len(); k++ { is.record = 0 // Reset record index for next library. logRecord := sl.LogRecords().At(k) if c.config.ExportRaw { b = []byte(logRecord.Body().AsString() + "\n") } else { // Parsing log record to Splunk event. event := mapLogRecordToSplunkEvent(rl.Resource(), logRecord, c.config) if event == nil { // TODO record this drop as a metric continue } // JSON encoding event and writing to buffer. var err error b, err = marshalEvent(event, c.config.MaxEventSize) if err != nil { permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( "dropped log event: %v, error: %w", event, err))) continue } } // Continue adding events to buffer up to capacity. _, err := buf.Write(b) if err == nil { continue } if errors.Is(err, errOverCapacity) { if !buf.Empty() { return iterState{i, j, k, false}, permanentErrors } permanentErrors = append(permanentErrors, consumererror.NewPermanent( fmt.Errorf("dropped log event: error: event size %d bytes larger than configured max"+ " content length %d bytes", len(b), c.config.MaxContentLengthLogs))) return iterState{i, j, k + 1, false}, permanentErrors } permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("error writing the event: %w", err))) } } } return iterState{done: true}, permanentErrors } func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterState) (iterState, []error) { var permanentErrors []error tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics)) for i := is.resource; i < metrics.ResourceMetrics().Len(); i++ { rm := metrics.ResourceMetrics().At(i) for j := is.library; j < rm.ScopeMetrics().Len(); j++ { is.library = 0 // Reset library index for next resource. sm := rm.ScopeMetrics().At(j) for k := is.record; k < sm.Metrics().Len(); k++ { is.record = 0 // Reset record index for next library. metric := sm.Metrics().At(k) // Parsing metric record to Splunk event. events := mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger) tempBuf.Reset() for _, event := range events { // JSON encoding event and writing to buffer. b, err := marshalEvent(event, c.config.MaxEventSize) if err != nil { permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err))) continue } tempBuf.Write(b) } // Continue adding events to buffer up to capacity. b := tempBuf.Bytes() _, err := buf.Write(b) if err == nil { continue } if errors.Is(err, errOverCapacity) { if !buf.Empty() { return iterState{i, j, k, false}, permanentErrors } permanentErrors = append(permanentErrors, consumererror.NewPermanent( fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+ " content length %d bytes", len(b), c.config.MaxContentLengthMetrics))) return iterState{i, j, k + 1, false}, permanentErrors } permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( "error writing the event: %w", err))) } } } return iterState{done: true}, permanentErrors } func (c *client) fillMetricsBufferMultiMetrics(events []*splunk.Event, buf buffer, is iterState) (iterState, []error) { var permanentErrors []error for i := is.record; i < len(events); i++ { event := events[i] // JSON encoding event and writing to buffer. b, jsonErr := marshalEvent(event, c.config.MaxEventSize) if jsonErr != nil { permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, jsonErr))) continue } _, err := buf.Write(b) if errors.Is(err, errOverCapacity) { if !buf.Empty() { return iterState{ record: i, done: false, }, permanentErrors } permanentErrors = append(permanentErrors, consumererror.NewPermanent( fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+ " content length %d bytes", len(b), c.config.MaxContentLengthMetrics))) return iterState{ record: i + 1, done: i+1 != len(events), }, permanentErrors } else if err != nil { permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( "error writing the event: %w", err))) } } return iterState{done: true}, permanentErrors } func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState) (iterState, []error) { var permanentErrors []error for i := is.resource; i < traces.ResourceSpans().Len(); i++ { rs := traces.ResourceSpans().At(i) for j := is.library; j < rs.ScopeSpans().Len(); j++ { is.library = 0 // Reset library index for next resource. ss := rs.ScopeSpans().At(j) for k := is.record; k < ss.Spans().Len(); k++ { is.record = 0 // Reset record index for next library. span := ss.Spans().At(k) // Parsing span record to Splunk event. event := mapSpanToSplunkEvent(rs.Resource(), span, c.config) // JSON encoding event and writing to buffer. b, err := marshalEvent(event, c.config.MaxEventSize) if err != nil { permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped span events: %v, error: %w", event, err))) continue } // Continue adding events to buffer up to capacity. _, err = buf.Write(b) if err == nil { continue } if errors.Is(err, errOverCapacity) { if !buf.Empty() { return iterState{i, j, k, false}, permanentErrors } permanentErrors = append(permanentErrors, consumererror.NewPermanent( fmt.Errorf("dropped span event: error: event size %d bytes larger than configured max"+ " content length %d bytes", len(b), c.config.MaxContentLengthTraces))) return iterState{i, j, k + 1, false}, permanentErrors } permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( "error writing the event: %w", err))) } } } return iterState{done: true}, permanentErrors } // pushMultiMetricsDataInBatches sends batches of Splunk multi-metric events in JSON format. // The batch content length is restricted to MaxContentLengthMetrics. // md metrics are parsed to Splunk events. func (c *client) pushMultiMetricsDataInBatches(ctx context.Context, md pmetric.Metrics, headers map[string]string) error { buf := c.bufferPool.get() defer c.bufferPool.put(buf) is := iterState{} var permanentErrors []error var events []*splunk.Event for i := 0; i < md.ResourceMetrics().Len(); i++ { rm := md.ResourceMetrics().At(i) for j := 0; j < rm.ScopeMetrics().Len(); j++ { sm := rm.ScopeMetrics().At(j) for k := 0; k < sm.Metrics().Len(); k++ { metric := sm.Metrics().At(k) // Parsing metric record to Splunk event. events = append(events, mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger)...) } } } merged, err := mergeEventsToMultiMetricFormat(events) if err != nil { return consumererror.NewPermanent(fmt.Errorf("error merging events: %w", err)) } for !is.done { buf.Reset() latestIterState, batchPermanentErrors := c.fillMetricsBufferMultiMetrics(merged, buf, is) permanentErrors = append(permanentErrors, batchPermanentErrors...) if !buf.Empty() { if err := c.postEvents(ctx, buf, headers); err != nil { return consumererror.NewMetrics(err, md) } } is = latestIterState } return multierr.Combine(permanentErrors...) } // pushMetricsDataInBatches sends batches of Splunk events in JSON format. // The batch content length is restricted to MaxContentLengthMetrics. // md metrics are parsed to Splunk events. func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metrics, headers map[string]string) error { buf := c.bufferPool.get() defer c.bufferPool.put(buf) is := iterState{} var permanentErrors []error for !is.done { buf.Reset() latestIterState, batchPermanentErrors := c.fillMetricsBuffer(md, buf, is) permanentErrors = append(permanentErrors, batchPermanentErrors...) if !buf.Empty() { if err := c.postEvents(ctx, buf, headers); err != nil { return consumererror.NewMetrics(err, subMetrics(md, is)) } } is = latestIterState } return multierr.Combine(permanentErrors...) } // pushTracesDataInBatches sends batches of Splunk events in JSON format. // The batch content length is restricted to MaxContentLengthMetrics. // td traces are parsed to Splunk events. func (c *client) pushTracesDataInBatches(ctx context.Context, td ptrace.Traces, headers map[string]string) error { buf := c.bufferPool.get() defer c.bufferPool.put(buf) is := iterState{} var permanentErrors []error for !is.done { buf.Reset() latestIterState, batchPermanentErrors := c.fillTracesBuffer(td, buf, is) permanentErrors = append(permanentErrors, batchPermanentErrors...) if !buf.Empty() { if err := c.postEvents(ctx, buf, headers); err != nil { return consumererror.NewTraces(err, subTraces(td, is)) } } is = latestIterState } return multierr.Combine(permanentErrors...) } func (c *client) postEvents(ctx context.Context, buf buffer, headers map[string]string) error { if err := buf.Close(); err != nil { return err } return c.hecWorker.send(ctx, buf, headers) } // subLogs returns a subset of logs starting from the state. func subLogs(src plog.Logs, state iterState) plog.Logs { if state.empty() { return src } dst := plog.NewLogs() resources := src.ResourceLogs() resourcesSub := dst.ResourceLogs() for i := state.resource; i < resources.Len(); i++ { newSub := resourcesSub.AppendEmpty() resources.At(i).Resource().CopyTo(newSub.Resource()) libraries := resources.At(i).ScopeLogs() librariesSub := newSub.ScopeLogs() j := 0 if i == state.resource { j = state.library } for ; j < libraries.Len(); j++ { lib := libraries.At(j) newLibSub := librariesSub.AppendEmpty() lib.Scope().CopyTo(newLibSub.Scope()) logs := lib.LogRecords() logsSub := newLibSub.LogRecords() k := 0 if i == state.resource && j == state.library { k = state.record } for ; k < logs.Len(); k++ { logs.At(k).CopyTo(logsSub.AppendEmpty()) } } } return dst } // subMetrics returns a subset of metrics starting from the state. func subMetrics(src pmetric.Metrics, state iterState) pmetric.Metrics { if state.empty() { return src } dst := pmetric.NewMetrics() resources := src.ResourceMetrics() resourcesSub := dst.ResourceMetrics() for i := state.resource; i < resources.Len(); i++ { newSub := resourcesSub.AppendEmpty() resources.At(i).Resource().CopyTo(newSub.Resource()) libraries := resources.At(i).ScopeMetrics() librariesSub := newSub.ScopeMetrics() j := 0 if i == state.resource { j = state.library } for ; j < libraries.Len(); j++ { lib := libraries.At(j) newLibSub := librariesSub.AppendEmpty() lib.Scope().CopyTo(newLibSub.Scope()) metrics := lib.Metrics() metricsSub := newLibSub.Metrics() k := 0 if i == state.resource && j == state.library { k = state.record } for ; k < metrics.Len(); k++ { metrics.At(k).CopyTo(metricsSub.AppendEmpty()) } } } return dst } func subTraces(src ptrace.Traces, state iterState) ptrace.Traces { if state.empty() { return src } dst := ptrace.NewTraces() resources := src.ResourceSpans() resourcesSub := dst.ResourceSpans() for i := state.resource; i < resources.Len(); i++ { newSub := resourcesSub.AppendEmpty() resources.At(i).Resource().CopyTo(newSub.Resource()) libraries := resources.At(i).ScopeSpans() librariesSub := newSub.ScopeSpans() j := 0 if i == state.resource { j = state.library } for ; j < libraries.Len(); j++ { lib := libraries.At(j) newLibSub := librariesSub.AppendEmpty() lib.Scope().CopyTo(newLibSub.Scope()) traces := lib.Spans() tracesSub := newLibSub.Spans() k := 0 if i == state.resource && j == state.library { k = state.record } for ; k < traces.Len(); k++ { traces.At(k).CopyTo(tracesSub.AppendEmpty()) } } } return dst } func (c *client) stop(context.Context) error { c.wg.Wait() if c.heartbeater != nil { c.heartbeater.shutdown() } return nil } func (c *client) start(ctx context.Context, host component.Host) (err error) { httpClient, err := buildHTTPClient(ctx, c.config, host, c.telemetrySettings) if err != nil { return err } if c.config.HecHealthCheckEnabled { healthCheckURL, _ := c.config.getURL() healthCheckURL.Path = c.config.HealthPath if err := checkHecHealth(ctx, httpClient, healthCheckURL); err != nil { return fmt.Errorf("%s: health check failed: %w", c.exporterName, err) } } url, _ := c.config.getURL() c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(c.config, c.buildInfo), c.logger} c.heartbeater = newHeartbeater(c.config, c.buildInfo, getPushLogFn(c), c.meter) if c.config.Heartbeat.Startup { if err := c.heartbeater.sendHeartbeat(c.config, c.buildInfo, getPushLogFn(c)); err != nil { return fmt.Errorf("%s: heartbeat on startup failed: %w", c.exporterName, err) } } return nil } func checkHecHealth(ctx context.Context, client *http.Client, healthCheckURL *url.URL) error { req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthCheckURL.String(), nil) if err != nil { return consumererror.NewPermanent(err) } resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() err = splunk.HandleHTTPCode(resp) if err != nil { return err } return nil } func buildHTTPClient(ctx context.Context, config *Config, host component.Host, telemetrySettings component.TelemetrySettings) (*http.Client, error) { // we handle compression explicitly. config.Compression = "" return config.ToClient(ctx, host, telemetrySettings) } func buildHTTPHeaders(config *Config, buildInfo component.BuildInfo) map[string]string { appVersion := config.SplunkAppVersion if appVersion == "" { appVersion = buildInfo.Version } return map[string]string{ "Connection": "keep-alive", "Content-Type": "application/json", "User-Agent": config.SplunkAppName + "/" + appVersion, "Authorization": splunk.HECTokenHeader + " " + string(config.Token), "__splunk_app_name": config.SplunkAppName, "__splunk_app_version": config.SplunkAppVersion, } } // marshalEvent marshals an event to JSON func marshalEvent(event *splunk.Event, sizeLimit uint) ([]byte, error) { b, err := json.Marshal(event) if err != nil { return nil, err } if uint(len(b)) > sizeLimit { return nil, fmt.Errorf("event size %d exceeds limit %d", len(b), sizeLimit) } return b, nil }