x-pack/filebeat/input/httpjson/input.go (382 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package httpjson import ( "context" "encoding/json" "errors" "fmt" "io/fs" "net" "net/http" "net/url" "os" "path/filepath" "sort" "strings" "time" retryablehttp "github.com/hashicorp/go-retryablehttp" "go.elastic.co/ecszap" "go.uber.org/zap" "go.uber.org/zap/zapcore" "github.com/elastic/mito/lib/xml" v2 "github.com/elastic/beats/v7/filebeat/input/v2" inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog" "github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httpmon" "github.com/elastic/beats/v7/x-pack/filebeat/input/internal/private" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/transport" "github.com/elastic/elastic-agent-libs/transport/httpcommon" "github.com/elastic/elastic-agent-libs/useragent" "github.com/elastic/go-concert/ctxtool" "github.com/elastic/go-concert/timed" ) const ( inputName = "httpjson" ) var ( userAgent = useragent.UserAgent("Filebeat", version.GetDefaultVersion(), version.Commit(), version.BuildTime().String()) // for testing timeNow = time.Now ) // retryLogger provides a shim for a *logp.Logger to be used by // go-retryablehttp as a retryablehttp.LeveledLogger. type retryLogger struct { log *logp.Logger } func newRetryLogger(log *logp.Logger) *retryLogger { return &retryLogger{ log: log.Named("retryablehttp").WithOptions(zap.AddCallerSkip(1)), } } func (log *retryLogger) Error(msg string, keysAndValues ...interface{}) { log.log.Errorw(msg, keysAndValues...) } func (log *retryLogger) Info(msg string, keysAndValues ...interface{}) { log.log.Infow(msg, keysAndValues...) } func (log *retryLogger) Debug(msg string, keysAndValues ...interface{}) { log.log.Debugw(msg, keysAndValues...) } func (log *retryLogger) Warn(msg string, keysAndValues ...interface{}) { log.log.Warnw(msg, keysAndValues...) } func Plugin(log *logp.Logger, store statestore.States) v2.Plugin { return v2.Plugin{ Name: inputName, Stability: feature.Stable, Deprecated: false, Manager: NewInputManager(log, store), } } type redact struct { value mapstrM fields []string } func (r redact) MarshalLogObject(enc zapcore.ObjectEncoder) error { v, err := private.Redact(r.value, "", r.fields) if err != nil { return fmt.Errorf("could not redact value: %v", err) } return v.MarshalLogObject(enc) } // mapstrM is a non-mutating version of mapstr.M. // See https://github.com/elastic/elastic-agent-libs/issues/232. type mapstrM mapstr.M // MarshalLogObject implements the zapcore.ObjectMarshaler interface and allows // for more efficient marshaling of mapstrM in structured logging. func (m mapstrM) MarshalLogObject(enc zapcore.ObjectEncoder) error { if len(m) == 0 { return nil } keys := make([]string, 0, len(m)) for k := range m { keys = append(keys, k) } sort.Strings(keys) for _, k := range keys { v := m[k] if inner, ok := tryToMapStr(v); ok { err := enc.AddObject(k, inner) if err != nil { return fmt.Errorf("failed to add object: %w", err) } continue } zap.Any(k, v).AddTo(enc) } return nil } func tryToMapStr(v interface{}) (mapstrM, bool) { switch m := v.(type) { case mapstrM: return m, true case map[string]interface{}: return mapstrM(m), true default: return nil, false } } func test(url *url.URL) error { port := func() string { if url.Port() != "" { return url.Port() } switch url.Scheme { case "https": return "443" } return "80" }() _, err := net.DialTimeout("tcp", net.JoinHostPort(url.Hostname(), port), time.Second) if err != nil { return fmt.Errorf("url %q is unreachable", url) } return nil } func runWithMetrics(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcursor.Cursor) error { reg, unreg := inputmon.NewInputRegistry("httpjson", ctx.ID, nil) defer unreg() return run(ctx, cfg, pub, crsr, reg) } func run(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcursor.Cursor, reg *monitoring.Registry) error { log := ctx.Logger.With("input_url", cfg.Request.URL) stdCtx := ctxtool.FromCanceller(ctx.Cancelation) if cfg.Request.Tracer != nil { id := sanitizeFileName(ctx.IDWithoutName) cfg.Request.Tracer.Filename = strings.ReplaceAll(cfg.Request.Tracer.Filename, "*", id) // Propagate tracer behaviour to all chain children. for i, c := range cfg.Chain { if c.Step != nil { // Request is validated as required. cfg.Chain[i].Step.Request.Tracer = cfg.Request.Tracer } if c.While != nil { // Request is validated as required. cfg.Chain[i].While.Request.Tracer = cfg.Request.Tracer } } } metrics := newInputMetrics(reg) client, err := newHTTPClient(stdCtx, cfg, log, reg) if err != nil { return err } requestFactory, err := newRequestFactory(stdCtx, cfg, log, metrics, reg) if err != nil { log.Errorf("Error while creating requestFactory: %v", err) return err } var xmlDetails map[string]xml.Detail if cfg.Response.XSD != "" { xmlDetails, err = xml.Details([]byte(cfg.Response.XSD)) if err != nil { log.Errorf("error while collecting xml decoder type hints: %v", err) return err } } pagination := newPagination(cfg, client, log) responseProcessor := newResponseProcessor(cfg, pagination, xmlDetails, metrics, log) requester := newRequester(client, requestFactory, responseProcessor, metrics, log) trCtx := emptyTransformContext() trCtx.cursor = newCursor(cfg.Cursor, log) trCtx.cursor.load(crsr) doFunc := func() error { defer func() { // Clear response bodies between evaluations. trCtx.firstResponse.body = nil trCtx.lastResponse.body = nil }() log.Info("Process another repeated request.") startTime := time.Now() var err error if err = requester.doRequest(stdCtx, trCtx, pub); err != nil { log.Errorf("Error while processing http request: %v", err) } metrics.updateIntervalMetrics(err, startTime) if err := stdCtx.Err(); err != nil { return err } return nil } // we trigger the first call immediately, // then we schedule it on the given interval using timed.Periodic if err = doFunc(); err == nil { err = timed.Periodic(stdCtx, cfg.Interval, doFunc) } log.Infof("Input stopped because context was cancelled with: %v", err) return nil } // sanitizeFileName returns name with ":" and "/" replaced with "_", removing repeated instances. // The request.tracer.filename may have ":" when a httpjson input has cursor config and // the macOS Finder will treat this as path-separator and causes to show up strange filepaths. func sanitizeFileName(name string) string { name = strings.ReplaceAll(name, ":", string(filepath.Separator)) name = filepath.Clean(name) return strings.ReplaceAll(name, string(filepath.Separator), "_") } func newHTTPClient(ctx context.Context, config config, log *logp.Logger, reg *monitoring.Registry) (*httpClient, error) { client, err := newNetHTTPClient(ctx, config.Request, log, reg) if err != nil { return nil, err } if config.Request.Retry.getMaxAttempts() > 1 { // Make retryable HTTP client if needed. client = (&retryablehttp.Client{ HTTPClient: client, Logger: newRetryLogger(log), RetryWaitMin: config.Request.Retry.getWaitMin(), RetryWaitMax: config.Request.Retry.getWaitMax(), RetryMax: config.Request.Retry.getMaxAttempts(), CheckRetry: retryablehttp.DefaultRetryPolicy, Backoff: retryablehttp.DefaultBackoff, }).StandardClient() } limiter := newRateLimiterFromConfig(config.Request.RateLimit, log) if config.Auth.OAuth2.isEnabled() { authClient, err := config.Auth.OAuth2.client(ctx, client) if err != nil { return nil, err } return &httpClient{client: authClient, limiter: limiter}, nil } return &httpClient{client: client, limiter: limiter}, nil } // lumberjackTimestamp is a glob expression matching the time format string used // by lumberjack when rolling over logs, "2006-01-02T15-04-05.000". // https://github.com/natefinch/lumberjack/blob/4cb27fcfbb0f35cb48c542c5ea80b7c1d18933d0/lumberjack.go#L39 const lumberjackTimestamp = "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]-[0-9][0-9]-[0-9][0-9].[0-9][0-9][0-9]" func newNetHTTPClient(ctx context.Context, cfg *requestConfig, log *logp.Logger, reg *monitoring.Registry) (*http.Client, error) { netHTTPClient, err := cfg.Transport.Client(clientOptions(cfg.URL.URL, cfg.KeepAlive.settings())...) if err != nil { return nil, err } if cfg.Tracer.enabled() { w := zapcore.AddSync(cfg.Tracer) go func() { // Close the logger when we are done. <-ctx.Done() cfg.Tracer.Close() }() core := ecszap.NewCore( ecszap.NewDefaultEncoderConfig(), w, zap.DebugLevel, ) traceLogger := zap.New(core) maxBodyLen := cfg.Tracer.MaxSize * 1e6 / 10 // 10% of file max netHTTPClient.Transport = httplog.NewLoggingRoundTripper(netHTTPClient.Transport, traceLogger, maxBodyLen, log) } else if cfg.Tracer != nil { // We have a trace log name, but we are not enabled, // so remove all trace logs we own. err = os.Remove(cfg.Tracer.Filename) if err != nil && !errors.Is(err, fs.ErrNotExist) { log.Errorw("failed to remove request trace log", "path", cfg.Tracer.Filename, "error", err) } ext := filepath.Ext(cfg.Tracer.Filename) base := strings.TrimSuffix(cfg.Tracer.Filename, ext) paths, err := filepath.Glob(base + "-" + lumberjackTimestamp + ext) if err != nil { log.Errorw("failed to collect request trace log path names", "error", err) } for _, p := range paths { err = os.Remove(p) if err != nil && !errors.Is(err, fs.ErrNotExist) { log.Errorw("failed to remove request trace log", "path", p, "error", err) } } } if reg != nil { netHTTPClient.Transport = httpmon.NewMetricsRoundTripper(netHTTPClient.Transport, reg) } netHTTPClient.CheckRedirect = checkRedirect(cfg, log) return netHTTPClient, nil } func newChainHTTPClient(ctx context.Context, authCfg *authConfig, requestCfg *requestConfig, log *logp.Logger, reg *monitoring.Registry, p ...*Policy) (*httpClient, error) { client, err := newNetHTTPClient(ctx, requestCfg, log, reg) if err != nil { return nil, err } var retryPolicyFunc retryablehttp.CheckRetry if len(p) != 0 { retryPolicyFunc = p[0].CustomRetryPolicy } else { retryPolicyFunc = retryablehttp.DefaultRetryPolicy } if requestCfg.Retry.getMaxAttempts() > 1 { // Make retryable HTTP client if needed. client = (&retryablehttp.Client{ HTTPClient: client, Logger: newRetryLogger(log), RetryWaitMin: requestCfg.Retry.getWaitMin(), RetryWaitMax: requestCfg.Retry.getWaitMax(), RetryMax: requestCfg.Retry.getMaxAttempts(), CheckRetry: retryPolicyFunc, Backoff: retryablehttp.DefaultBackoff, }).StandardClient() } limiter := newRateLimiterFromConfig(requestCfg.RateLimit, log) if authCfg != nil && authCfg.OAuth2.isEnabled() { authClient, err := authCfg.OAuth2.client(ctx, client) if err != nil { return nil, err } return &httpClient{client: authClient, limiter: limiter}, nil } return &httpClient{client: client, limiter: limiter}, nil } // clientOption returns constructed client configuration options, including // setting up http+unix and http+npipe transports if requested. func clientOptions(u *url.URL, keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption { scheme, trans, ok := strings.Cut(u.Scheme, "+") var dialer transport.Dialer switch { default: fallthrough case !ok: return []httpcommon.TransportOption{ httpcommon.WithAPMHTTPInstrumentation(), keepalive, } // We set the host for the unix socket and Windows named // pipes schemes because the http.Transport expects to // have a host and will error out if it is not present. // The values here are just non-zero with a helpful name. // They are not used in any logic. case trans == "unix": u.Host = "unix-socket" dialer = socketDialer{u.Path} case trans == "npipe": u.Host = "windows-npipe" dialer = npipeDialer{u.Path} } u.Scheme = scheme return []httpcommon.TransportOption{ httpcommon.WithAPMHTTPInstrumentation(), keepalive, httpcommon.WithBaseDialer(dialer), } } // socketDialer implements transport.Dialer to a constant socket path. type socketDialer struct { path string } func (d socketDialer) Dial(_, _ string) (net.Conn, error) { return net.Dial("unix", d.path) } func (d socketDialer) DialContext(ctx context.Context, _, _ string) (net.Conn, error) { var nd net.Dialer return nd.DialContext(ctx, "unix", d.path) } func checkRedirect(config *requestConfig, log *logp.Logger) func(*http.Request, []*http.Request) error { return func(req *http.Request, via []*http.Request) error { log.Debug("http client: checking redirect") if len(via) >= config.RedirectMaxRedirects { log.Debug("http client: max redirects exceeded") return fmt.Errorf("stopped after %d redirects", config.RedirectMaxRedirects) } if !config.RedirectForwardHeaders || len(via) == 0 { log.Debugf("http client: nothing to do while checking redirects - forward_headers: %v, via: %#v", config.RedirectForwardHeaders, via) return nil } prev := via[len(via)-1] // previous request to get headers from log.Debugf("http client: forwarding headers from previous request: %#v", prev.Header) req.Header = prev.Header.Clone() for _, k := range config.RedirectHeadersBanList { log.Debugf("http client: ban header %v", k) req.Header.Del(k) } return nil } } func makeEvent(body mapstr.M) (beat.Event, error) { bodyBytes, err := json.Marshal(body) if err != nil { return beat.Event{}, err } now := timeNow() fields := mapstr.M{ "event": mapstr.M{ "created": now, }, "message": string(bodyBytes), } return beat.Event{ Timestamp: now, Fields: fields, }, nil }