receiver/libhoneyreceiver/receiver.go (248 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package libhoneyreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver" import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "mime" "net" "net/http" "strings" "sync" "github.com/vmihailenco/msgpack/v5" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/errorutil" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/encoder" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/parser" ) type libhoneyReceiver struct { cfg *Config server *http.Server nextTraces consumer.Traces nextLogs consumer.Logs shutdownWG sync.WaitGroup obsreport *receiverhelper.ObsReport settings *receiver.Settings } // TeamInfo is part of the AuthInfo struct that stores the team slug type TeamInfo struct { Slug string `json:"slug"` } // EnvironmentInfo is part of the AuthInfo struct that stores the environment slug and name type EnvironmentInfo struct { Slug string `json:"slug"` Name string `json:"name"` } // AuthInfo is used by Libhoney to validate team and environment information against Honeycomb's Auth API type AuthInfo struct { APIKeyAccess map[string]bool `json:"api_key_access"` Team TeamInfo `json:"team"` Environment EnvironmentInfo `json:"environment"` } func newLibhoneyReceiver(cfg *Config, set *receiver.Settings) (*libhoneyReceiver, error) { r := &libhoneyReceiver{ cfg: cfg, nextTraces: nil, settings: set, } var err error r.obsreport, err = receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, Transport: "http", ReceiverCreateSettings: *set, }) if err != nil { return nil, err } return r, nil } func (r *libhoneyReceiver) startHTTPServer(ctx context.Context, host component.Host) error { // If HTTP is not enabled, nothing to start. if r.cfg.HTTP == nil { return nil } httpMux := http.NewServeMux() r.settings.Logger.Info("r.nextTraces is not null so httpTracesReceiver was added", zap.Int("paths", len(r.cfg.HTTP.TracesURLPaths))) for _, path := range r.cfg.HTTP.TracesURLPaths { httpMux.HandleFunc(path, func(resp http.ResponseWriter, req *http.Request) { r.handleEvent(resp, req) }) r.settings.Logger.Debug("Added path to HTTP server", zap.String("path", path)) } if r.cfg.AuthAPI != "" { httpMux.HandleFunc("/1/auth", func(resp http.ResponseWriter, req *http.Request) { r.handleAuth(resp, req) }) } var err error if r.server, err = r.cfg.HTTP.ToServer(ctx, host, r.settings.TelemetrySettings, httpMux); err != nil { return err } r.settings.Logger.Info("Starting HTTP server", zap.String("endpoint", r.cfg.HTTP.Endpoint)) var hln net.Listener if hln, err = r.cfg.HTTP.ToListener(ctx); err != nil { return err } r.shutdownWG.Add(1) go func() { defer r.shutdownWG.Done() if err := r.server.Serve(hln); err != nil && !errors.Is(err, http.ErrServerClosed) { componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) } }() return nil } func (r *libhoneyReceiver) Start(ctx context.Context, host component.Host) error { if err := r.startHTTPServer(ctx, host); err != nil { return errors.Join(err, r.Shutdown(ctx)) } return nil } // Shutdown is a method to turn off receiving. func (r *libhoneyReceiver) Shutdown(ctx context.Context) error { var err error if r.server != nil { err = r.server.Shutdown(ctx) } r.shutdownWG.Wait() return err } func (r *libhoneyReceiver) registerTraceConsumer(tc consumer.Traces) { r.nextTraces = tc } func (r *libhoneyReceiver) registerLogConsumer(tc consumer.Logs) { r.nextLogs = tc } func (r *libhoneyReceiver) handleAuth(resp http.ResponseWriter, req *http.Request) { authURL := fmt.Sprintf("%s/1/auth", r.cfg.AuthAPI) authReq, err := http.NewRequest(http.MethodGet, authURL, nil) if err != nil { errJSON, _ := json.Marshal(`{"error": "failed to create AuthInfo request"}`) writeResponse(resp, "json", http.StatusBadRequest, errJSON) return } authReq.Header.Set("x-honeycomb-team", req.Header.Get("x-honeycomb-team")) var authClient http.Client authResp, err := authClient.Do(authReq) if err != nil { errJSON, _ := json.Marshal(fmt.Sprintf(`"error": "failed to send request to auth api endpoint", "message", "%s"}`, err.Error())) writeResponse(resp, "json", http.StatusBadRequest, errJSON) return } defer authResp.Body.Close() switch { case authResp.StatusCode == http.StatusUnauthorized: errJSON, _ := json.Marshal(`"error": "received 401 response for AuthInfo request from Honeycomb API - check your API key"}`) writeResponse(resp, "json", http.StatusBadRequest, errJSON) return case authResp.StatusCode > 299: errJSON, _ := json.Marshal(fmt.Sprintf(`"error": "bad response code from API", "status_code", %d}`, authResp.StatusCode)) writeResponse(resp, "json", http.StatusBadRequest, errJSON) return } authRawBody, _ := io.ReadAll(authResp.Body) _, err = resp.Write(authRawBody) if err != nil { r.settings.Logger.Info("couldn't write http response") } } func (r *libhoneyReceiver) handleEvent(resp http.ResponseWriter, req *http.Request) { enc, ok := readContentType(resp, req) if !ok { return } dataset, err := parser.GetDatasetFromRequest(req.RequestURI) if err != nil { r.settings.Logger.Info("No dataset found in URL", zap.String("req.RequestURI", req.RequestURI)) } for _, p := range r.cfg.HTTP.TracesURLPaths { dataset = strings.Replace(dataset, p, "", 1) r.settings.Logger.Debug("dataset parsed", zap.String("dataset.parsed", dataset)) } body, err := io.ReadAll(req.Body) if err != nil { errorutil.HTTPError(resp, err) } if err = req.Body.Close(); err != nil { errorutil.HTTPError(resp, err) } libhoneyevents := make([]libhoneyevent.LibhoneyEvent, 0) switch req.Header.Get("Content-Type") { case "application/x-msgpack", "application/msgpack": decoder := msgpack.NewDecoder(bytes.NewReader(body)) decoder.UseLooseInterfaceDecoding(true) err = decoder.Decode(&libhoneyevents) if err != nil { r.settings.Logger.Info("messagepack decoding failed") } if len(libhoneyevents) > 0 { r.settings.Logger.Debug("Decoding with msgpack worked", zap.Time("timestamp.first.msgpacktimestamp", *libhoneyevents[0].MsgPackTimestamp), zap.String("timestamp.first.time", libhoneyevents[0].Time)) r.settings.Logger.Debug("event zero", zap.String("event.data", libhoneyevents[0].DebugString())) } case encoder.JSONContentType: err = json.Unmarshal(body, &libhoneyevents) if err != nil { errorutil.HTTPError(resp, err) } if len(libhoneyevents) > 0 { r.settings.Logger.Debug("Decoding with json worked", zap.Time("timestamp.first.msgpacktimestamp", *libhoneyevents[0].MsgPackTimestamp), zap.String("timestamp.first.time", libhoneyevents[0].Time)) } } otlpLogs, otlpTraces := parser.ToPdata(dataset, libhoneyevents, r.cfg.FieldMapConfig, *r.settings.Logger) numLogs := otlpLogs.LogRecordCount() if numLogs > 0 { ctx := r.obsreport.StartLogsOp(context.Background()) err = r.nextLogs.ConsumeLogs(ctx, otlpLogs) r.obsreport.EndLogsOp(ctx, "protobuf", numLogs, err) } numTraces := otlpTraces.SpanCount() if numTraces > 0 { ctx := r.obsreport.StartTracesOp(context.Background()) err = r.nextTraces.ConsumeTraces(ctx, otlpTraces) r.obsreport.EndTracesOp(ctx, "protobuf", numTraces, err) } if err != nil { errorutil.HTTPError(resp, err) return } noErrors := []byte(`{"errors":[]}`) writeResponse(resp, enc.ContentType(), http.StatusAccepted, noErrors) } func readContentType(resp http.ResponseWriter, req *http.Request) (encoder.Encoder, bool) { if req.Method != http.MethodPost { handleUnmatchedMethod(resp) return nil, false } switch getMimeTypeFromContentType(req.Header.Get("Content-Type")) { case encoder.JSONContentType: return encoder.JsEncoder, true case "application/x-msgpack", "application/msgpack": return encoder.MpEncoder, true default: handleUnmatchedContentType(resp) return nil, false } } func writeResponse(w http.ResponseWriter, contentType string, statusCode int, msg []byte) { w.Header().Set("Content-Type", contentType) w.WriteHeader(statusCode) _, _ = w.Write(msg) } func getMimeTypeFromContentType(contentType string) string { mediatype, _, err := mime.ParseMediaType(contentType) if err != nil { return "" } return mediatype } func handleUnmatchedMethod(resp http.ResponseWriter) { status := http.StatusMethodNotAllowed writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v method not allowed, supported: [POST]", status))) } func handleUnmatchedContentType(resp http.ResponseWriter) { status := http.StatusUnsupportedMediaType writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v unsupported media type, supported: [%s, %s]", status, encoder.JSONContentType, encoder.PbContentType))) }