receiver/splunkhecreceiver/receiver.go (468 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package splunkhecreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver" import ( "compress/gzip" "context" "encoding/json" "errors" "fmt" "io" "net/http" "strconv" "strings" "sync" "time" "github.com/google/uuid" "github.com/gorilla/mux" jsoniter "github.com/json-iterator/go" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver/internal/metadata" ) const ( defaultServerTimeout = 20 * time.Second ackResponse = `{"acks": %s}` responseOK = `{"text": "Success", "code": 0}` responseOKWithAckID = `{"text": "Success", "code": 0, "ackId": %d}` responseHecHealthy = `{"text": "HEC is healthy", "code": 17}` responseInvalidMethodPostOnly = `"Only \"POST\" method is supported"` responseInvalidEncoding = `"\"Content-Encoding\" must be \"gzip\" or empty"` responseInvalidDataFormat = `{"text":"Invalid data format","code":6}` responseErrEventRequired = `{"text":"Event field is required","code":12}` responseErrEventBlank = `{"text":"Event field cannot be blank","code":13}` responseErrGzipReader = `"Error on gzip body"` responseErrUnmarshalBody = `"Failed to unmarshal message body"` responseErrInternalServerError = `"Internal Server Error"` responseErrUnsupportedMetricEvent = `"Unsupported metric event"` responseErrUnsupportedLogEvent = `"Unsupported log event"` responseErrHandlingIndexedFields = `{"text":"Error in handling indexed fields","code":15,"invalid-event-number":%d}` responseErrDataChannelMissing = `{"text": "Data channel is missing","code":10}` responseErrInvalidDataChannel = `{"text": "Invalid data channel", "code": 11}` responseNoData = `{"text":"No data","code":5}` // Centralizing some HTTP and related string constants. gzipEncoding = "gzip" httpContentEncodingHeader = "Content-Encoding" httpContentTypeHeader = "Content-Type" httpJSONTypeHeader = "application/json" ) var ( errEmptyEndpoint = errors.New("empty endpoint") errInvalidMethod = errors.New("invalid http method") errInvalidEncoding = errors.New("invalid encoding") errExtensionMissing = errors.New("ack extension not found") okRespBody = []byte(responseOK) eventRequiredRespBody = []byte(responseErrEventRequired) eventBlankRespBody = []byte(responseErrEventBlank) requiredDataChannelHeader = []byte(responseErrDataChannelMissing) invalidEncodingRespBody = []byte(responseInvalidEncoding) invalidFormatRespBody = []byte(responseInvalidDataFormat) invalidMethodRespBodyPostOnly = []byte(responseInvalidMethodPostOnly) errGzipReaderRespBody = []byte(responseErrGzipReader) errUnmarshalBodyRespBody = []byte(responseErrUnmarshalBody) errInternalServerError = []byte(responseErrInternalServerError) errUnsupportedMetricEvent = []byte(responseErrUnsupportedMetricEvent) errUnsupportedLogEvent = []byte(responseErrUnsupportedLogEvent) noDataRespBody = []byte(responseNoData) ) // splunkReceiver implements the receiver.Metrics for Splunk HEC metric protocol. type splunkReceiver struct { settings receiver.Settings config *Config logsConsumer consumer.Logs metricsConsumer consumer.Metrics server *http.Server shutdownWG sync.WaitGroup obsrecv *receiverhelper.ObsReport gzipReaderPool *sync.Pool ackExt ackextension.AckExtension } var ( _ receiver.Metrics = (*splunkReceiver)(nil) _ receiver.Logs = (*splunkReceiver)(nil) ) // newReceiver creates the Splunk HEC receiver with the given configuration. func newReceiver(settings receiver.Settings, config Config) (*splunkReceiver, error) { if config.Endpoint == "" { return nil, errEmptyEndpoint } transport := "http" if config.TLSSetting != nil { transport = "https" } obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: settings.ID, Transport: transport, ReceiverCreateSettings: settings, }) if err != nil { return nil, err } r := &splunkReceiver{ settings: settings, config: &config, server: &http.Server{ Addr: config.Endpoint, // TODO: Evaluate what properties should be configurable, for now // set some hard-coded values. ReadHeaderTimeout: defaultServerTimeout, WriteTimeout: defaultServerTimeout, }, obsrecv: obsrecv, gzipReaderPool: &sync.Pool{New: func() any { return new(gzip.Reader) }}, } return r, nil } // Start tells the receiver to start its processing. // By convention the consumer of the received data is set when the receiver // instance is created. func (r *splunkReceiver) Start(ctx context.Context, host component.Host) error { // server.Handler will be nil on initial call, otherwise noop. if r.server != nil && r.server.Handler != nil { return nil } mx := mux.NewRouter() // set up the ack API handler if the ack extension is present if r.config.Extension != nil { ext, found := host.GetExtensions()[*r.config.Extension] if !found { return fmt.Errorf("specified ack extension with id %q could not be found", *r.config.Extension) } r.ackExt = ext.(ackextension.AckExtension) mx.NewRoute().Path(r.config.Ack.Path).HandlerFunc(r.handleAck) } mx.NewRoute().Path(r.config.HealthPath).HandlerFunc(r.handleHealthReq) mx.NewRoute().Path(r.config.HealthPath + "/1.0").HandlerFunc(r.handleHealthReq).Methods(http.MethodGet) if r.logsConsumer != nil { mx.NewRoute().Path(r.config.RawPath).HandlerFunc(r.handleRawReq) } mx.NewRoute().HandlerFunc(r.handleReq) // set up the listener ln, err := r.config.ToListener(ctx) if err != nil { return fmt.Errorf("failed to bind to address %s: %w", r.config.Endpoint, err) } r.server, err = r.config.ToServer(ctx, host, r.settings.TelemetrySettings, mx) if err != nil { return err } // TODO: Evaluate what properties should be configurable, for now // set some hard-coded values. r.server.ReadHeaderTimeout = defaultServerTimeout r.server.WriteTimeout = defaultServerTimeout r.shutdownWG.Add(1) go func() { defer r.shutdownWG.Done() if errHTTP := r.server.Serve(ln); !errors.Is(errHTTP, http.ErrServerClosed) && errHTTP != nil { componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(errHTTP)) } }() return err } // Shutdown tells the receiver that should stop reception, // giving it a chance to perform any necessary clean-up. func (r *splunkReceiver) Shutdown(context.Context) error { err := r.server.Close() r.shutdownWG.Wait() return err } func (r *splunkReceiver) processSuccessResponseWithAck(resp http.ResponseWriter, channelID string) error { if r.ackExt == nil { panic("writing response with ack when ack extension is not configured") } ackID := r.ackExt.ProcessEvent(channelID) r.ackExt.Ack(channelID, ackID) return r.processSuccessResponse(resp, []byte(fmt.Sprintf(responseOKWithAckID, ackID))) } func (r *splunkReceiver) processSuccessResponse(resp http.ResponseWriter, bodyContent []byte) error { resp.Header().Set(httpContentTypeHeader, httpJSONTypeHeader) resp.WriteHeader(http.StatusOK) _, err := resp.Write(bodyContent) return err } func (r *splunkReceiver) handleAck(resp http.ResponseWriter, req *http.Request) { if req.Method != http.MethodPost { r.failRequest(resp, http.StatusBadRequest, invalidMethodRespBodyPostOnly, errInvalidMethod) return } // shouldn't run into this case since we only enable this handler IF ackExt exists. But we have this check just in case if r.ackExt == nil { r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, errExtensionMissing) return } var channelID string var extracted bool channelID, extracted = r.extractChannel(req) if !extracted { r.failRequest(resp, http.StatusBadRequest, requiredDataChannelHeader, nil) return } if channelErr := r.validateChannelHeader(channelID); channelErr != nil { r.failRequest(resp, http.StatusBadRequest, []byte(channelErr.Error()), channelErr) return } dec := json.NewDecoder(req.Body) var ackRequest splunk.AckRequest err := dec.Decode(&ackRequest) if err != nil { r.failRequest(resp, http.StatusBadRequest, invalidFormatRespBody, err) return } if len(ackRequest.Acks) == 0 { r.failRequest(resp, http.StatusBadRequest, invalidFormatRespBody, errors.New("request body must include at least one ackID to be queried")) return } queriedAcks := r.ackExt.QueryAcks(channelID, ackRequest.Acks) ackString, _ := json.Marshal(queriedAcks) if err := r.processSuccessResponse(resp, []byte(fmt.Sprintf(ackResponse, ackString))); err != nil { r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, err) } } func (r *splunkReceiver) handleRawReq(resp http.ResponseWriter, req *http.Request) { ctx := req.Context() ctx = r.obsrecv.StartLogsOp(ctx) if req.Method != http.MethodPost { r.failRequest(resp, http.StatusBadRequest, invalidMethodRespBodyPostOnly, errInvalidMethod) return } encoding := req.Header.Get(httpContentEncodingHeader) if encoding != "" && encoding != gzipEncoding { r.failRequest(resp, http.StatusUnsupportedMediaType, invalidEncodingRespBody, errInvalidEncoding) return } var channelID string var extracted bool if channelID, extracted = r.extractChannel(req); extracted { if channelErr := r.validateChannelHeader(channelID); channelErr != nil { r.failRequest(resp, http.StatusBadRequest, []byte(channelErr.Error()), channelErr) return } } if req.ContentLength == 0 { r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), 0, nil) r.failRequest(resp, http.StatusBadRequest, noDataRespBody, nil) return } bodyReader := req.Body if encoding == gzipEncoding { reader := r.gzipReaderPool.Get().(*gzip.Reader) err := reader.Reset(bodyReader) if err != nil { r.failRequest(resp, http.StatusBadRequest, errGzipReaderRespBody, err) _, _ = io.ReadAll(req.Body) _ = req.Body.Close() return } bodyReader = reader defer r.gzipReaderPool.Put(reader) } resourceCustomizer := r.createResourceCustomizer(req) query := req.URL.Query() var timestamp pcommon.Timestamp if query.Has(queryTime) { t, err := strconv.ParseInt(query.Get(queryTime), 10, 64) if t < 0 { err = errors.New("time cannot be less than 0") } if err != nil { r.failRequest(resp, http.StatusBadRequest, invalidFormatRespBody, err) return } timestamp = pcommon.NewTimestampFromTime(time.Unix(t, 0)) } ld, slLen, err := splunkHecRawToLogData(bodyReader, query, resourceCustomizer, r.config, timestamp) if err != nil { r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, err) return } consumerErr := r.logsConsumer.ConsumeLogs(ctx, ld) _ = bodyReader.Close() if consumerErr != nil { r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, consumerErr) } else { var ackErr error if len(channelID) > 0 && r.ackExt != nil { ackErr = r.processSuccessResponseWithAck(resp, channelID) } else { ackErr = r.processSuccessResponse(resp, okRespBody) } if ackErr != nil { r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, err) } else { r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), slLen, nil) } } } func (r *splunkReceiver) extractChannel(req *http.Request) (string, bool) { // check header for k, v := range req.Header { if strings.EqualFold(k, splunk.HTTPSplunkChannelHeader) { return strings.ToUpper(v[0]), true } } // check query param for k, v := range req.URL.Query() { if strings.EqualFold(k, "channel") { return strings.ToUpper(v[0]), true } } return "", false } func (r *splunkReceiver) validateChannelHeader(channelID string) error { if len(channelID) == 0 { return errors.New(responseErrDataChannelMissing) } // channel id must be a valid uuid // https://docs.splunk.com/Documentation/Splunk/9.2.1/Data/AboutHECIDXAck#:~:text=close%20the%20file.-,About%20channels%20and%20sending%20data,-Sending%20events%20to _, err := uuid.Parse(channelID) if err != nil { return errors.New(responseErrInvalidDataChannel) } return nil } func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request) { ctx := req.Context() if req.Method != http.MethodPost { r.failRequest(resp, http.StatusBadRequest, invalidMethodRespBodyPostOnly, errInvalidMethod) return } encoding := req.Header.Get(httpContentEncodingHeader) if encoding != "" && encoding != gzipEncoding { r.failRequest(resp, http.StatusUnsupportedMediaType, invalidEncodingRespBody, errInvalidEncoding) return } channelID, extracted := r.extractChannel(req) if extracted { if channelErr := r.validateChannelHeader(channelID); channelErr != nil { r.failRequest(resp, http.StatusBadRequest, []byte(channelErr.Error()), channelErr) return } } bodyReader := req.Body if encoding == gzipEncoding { reader := r.gzipReaderPool.Get().(*gzip.Reader) err := reader.Reset(bodyReader) if err != nil { r.failRequest(resp, http.StatusBadRequest, errGzipReaderRespBody, err) return } bodyReader = reader defer r.gzipReaderPool.Put(reader) } if req.ContentLength == 0 { r.failRequest(resp, http.StatusBadRequest, noDataRespBody, nil) return } dec := jsoniter.NewDecoder(bodyReader) var events []*splunk.Event var metricEvents []*splunk.Event for dec.More() { var msg splunk.Event err := dec.Decode(&msg) if err != nil { r.failRequest(resp, http.StatusBadRequest, invalidFormatRespBody, err) return } for _, v := range msg.Fields { if !isFlatJSONField(v) { r.failRequest(resp, http.StatusBadRequest, []byte(fmt.Sprintf(responseErrHandlingIndexedFields, len(events)+len(metricEvents))), nil) return } } if msg.IsMetric() { if r.metricsConsumer == nil { r.failRequest(resp, http.StatusBadRequest, errUnsupportedMetricEvent, err) return } metricEvents = append(metricEvents, &msg) } else { if msg.Event == nil { r.failRequest(resp, http.StatusBadRequest, eventRequiredRespBody, nil) return } if msg.Event == "" { r.failRequest(resp, http.StatusBadRequest, eventBlankRespBody, nil) return } if r.logsConsumer == nil { r.failRequest(resp, http.StatusBadRequest, errUnsupportedLogEvent, err) return } events = append(events, &msg) } } resourceCustomizer := r.createResourceCustomizer(req) if r.logsConsumer != nil && len(events) > 0 { ld, err := splunkHecToLogData(r.settings.Logger, events, resourceCustomizer, r.config) if err != nil { r.failRequest(resp, http.StatusBadRequest, errUnmarshalBodyRespBody, err) return } ctx = r.obsrecv.StartLogsOp(ctx) decodeErr := r.logsConsumer.ConsumeLogs(ctx, ld) r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), len(events), nil) if decodeErr != nil { r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, decodeErr) return } } if r.metricsConsumer != nil && len(metricEvents) > 0 { md, _ := splunkHecToMetricsData(r.settings.Logger, metricEvents, resourceCustomizer, r.config) ctx = r.obsrecv.StartMetricsOp(ctx) decodeErr := r.metricsConsumer.ConsumeMetrics(ctx, md) r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), len(metricEvents), nil) if decodeErr != nil { r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, decodeErr) return } } var ackErr error if len(channelID) > 0 && r.ackExt != nil { ackErr = r.processSuccessResponseWithAck(resp, channelID) } else { ackErr = r.processSuccessResponse(resp, okRespBody) } if ackErr != nil { r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, ackErr) } } func (r *splunkReceiver) createResourceCustomizer(req *http.Request) func(resource pcommon.Resource) { if r.config.AccessTokenPassthrough { accessToken := req.Header.Get("Authorization") if strings.HasPrefix(accessToken, splunk.HECTokenHeader+" ") { accessTokenValue := accessToken[len(splunk.HECTokenHeader)+1:] return func(resource pcommon.Resource) { resource.Attributes().PutStr(splunk.HecTokenLabel, accessTokenValue) } } } return nil } func (r *splunkReceiver) failRequest( resp http.ResponseWriter, httpStatusCode int, jsonResponse []byte, err error, ) { resp.WriteHeader(httpStatusCode) if len(jsonResponse) > 0 { // The response needs to be written as a JSON string. resp.Header().Add("Content-Type", "application/json") _, writeErr := resp.Write(jsonResponse) if writeErr != nil { r.settings.Logger.Warn("Error writing HTTP response message", zap.Error(writeErr)) } } if r.settings.Logger.Core().Enabled(zap.DebugLevel) { msg := string(jsonResponse) r.settings.Logger.Debug( "Splunk HEC receiver request failed", zap.Int("http_status_code", httpStatusCode), zap.String("msg", msg), zap.Error(err), // It handles nil error ) } } func (r *splunkReceiver) handleHealthReq(writer http.ResponseWriter, _ *http.Request) { writer.Header().Add("Content-Type", "application/json") writer.WriteHeader(http.StatusOK) _, _ = writer.Write([]byte(responseHecHealthy)) } func isFlatJSONField(field any) bool { switch value := field.(type) { case map[string]any: return false case []any: for _, v := range value { switch v.(type) { case map[string]any, []any: return false } } } return true }