x-pack/filebeat/input/cel/input.go (1,054 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
// Package cel implements an input that uses the Common Expression Language to
// perform requests and do endpoint processing of events. The cel package exposes
// the github.com/elastic/mito/lib CEL extension library.
package cel
import (
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"reflect"
"regexp"
"slices"
"strconv"
"strings"
"time"
"github.com/hashicorp/go-retryablehttp"
"github.com/icholy/digest"
"github.com/rcrowley/go-metrics"
"go.elastic.co/ecszap"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
"github.com/google/cel-go/cel"
"github.com/google/cel-go/checker/decls"
"google.golang.org/protobuf/types/known/structpb"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httpmon"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
"github.com/elastic/elastic-agent-libs/transport"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
"github.com/elastic/elastic-agent-libs/useragent"
"github.com/elastic/go-concert/ctxtool"
"github.com/elastic/go-concert/timed"
"github.com/elastic/mito/lib"
)
const (
// inputName is the name of the input processor.
inputName = "cel"
// root is the label of the object through which the input state is
// exposed to the CEL program.
root = "state"
)
// The Filebeat user-agent is provided to the program as useragent. If a request
// is not given a user-agent string, this user agent is added to the request.
var userAgent = useragent.UserAgent("Filebeat", version.GetDefaultVersion(), version.Commit(), version.BuildTime().String())
func Plugin(log *logp.Logger, store statestore.States) v2.Plugin {
return v2.Plugin{
Name: inputName,
Stability: feature.Stable,
Manager: NewInputManager(log, store),
}
}
type input struct {
time func() time.Time
}
// now is time.Now with a modifiable time source.
func (i input) now() time.Time {
if i.time == nil {
return time.Now()
}
return i.time()
}
func (input) Name() string { return inputName }
func (input) Test(src inputcursor.Source, _ v2.TestContext) error {
cfg := src.(*source).cfg
if !wantClient(cfg) {
return nil
}
return test(cfg.Resource.URL.URL)
}
// Run starts the input and blocks until it ends completes. It will return on
// context cancellation or type invalidity errors, any other error will be retried.
func (input) Run(env v2.Context, src inputcursor.Source, crsr inputcursor.Cursor, pub inputcursor.Publisher) error {
var cursor map[string]interface{}
env.UpdateStatus(status.Starting, "")
if !crsr.IsNew() { // Allow the user to bootstrap the program if needed.
err := crsr.Unpack(&cursor)
if err != nil {
env.UpdateStatus(status.Failed, "failed to unpack cursor: "+err.Error())
return err
}
}
err := input{}.run(env, src.(*source), cursor, pub)
if err != nil {
env.UpdateStatus(status.Failed, "failed to run: "+err.Error())
return err
}
env.UpdateStatus(status.Stopped, "")
return nil
}
// sanitizeFileName returns name with ":" and "/" replaced with "_", removing repeated instances.
// The request.tracer.filename may have ":" when a cel input has cursor config and
// the macOS Finder will treat this as path-separator and causes to show up strange filepaths.
func sanitizeFileName(name string) string {
name = strings.ReplaceAll(name, ":", string(filepath.Separator))
name = filepath.Clean(name)
return strings.ReplaceAll(name, string(filepath.Separator), "_")
}
func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, pub inputcursor.Publisher) error {
cfg := src.cfg
log := env.Logger.With("input_url", cfg.Resource.URL)
metrics, reg := newInputMetrics(env.ID)
defer metrics.Close()
ctx := ctxtool.FromCanceller(env.Cancelation)
if cfg.Resource.Tracer != nil {
id := sanitizeFileName(env.IDWithoutName)
cfg.Resource.Tracer.Filename = strings.ReplaceAll(cfg.Resource.Tracer.Filename, "*", id)
}
client, trace, err := newClient(ctx, cfg, log, reg)
if err != nil {
return err
}
limiter := newRateLimiterFromConfig(cfg.Resource)
patterns, err := regexpsFromConfig(cfg)
if err != nil {
return err
}
var auth *lib.BasicAuth
if cfg.Auth.Basic.isEnabled() {
auth = &lib.BasicAuth{
Username: cfg.Auth.Basic.User,
Password: cfg.Auth.Basic.Password,
}
}
wantDump := cfg.FailureDump.enabled() && cfg.FailureDump.Filename != ""
doCov := cfg.RecordCoverage && log.IsDebug()
httpOptions := lib.HTTPOptions{
Limiter: limiter,
BasicAuth: auth,
Headers: cfg.Resource.Headers,
MaxBodySize: cfg.Resource.MaxBodySize,
}
prg, ast, cov, err := newProgram(ctx, cfg.Program, root, getEnv(cfg.AllowedEnvironment), client, httpOptions, patterns, cfg.XSDs, log, trace, wantDump, doCov)
if err != nil {
return err
}
var state map[string]interface{}
if cfg.State == nil {
state = make(map[string]interface{})
} else {
state = cfg.State
}
if cursor != nil {
state["cursor"] = cursor
}
goodCursor := cursor
goodURL := cfg.Resource.URL.String()
state["url"] = goodURL
metrics.resource.Set(goodURL)
env.UpdateStatus(status.Running, "")
// On entry, state is expected to be in the shape:
//
// {
// "url": <resource address>,
// "cursor": { ... },
// ...
// }
//
// The url field must be present and can be an HTTP end-point
// or a file path. It is currently the responsibility of the
// program to handle removing the scheme from a file url if it
// is present. The url may be mutated during execution of the
// program but the mutated state will not be persisted between
// restarts and the url must be present in the returned value
// to ensure that it is available in the next evaluation unless
// the program has the resource address hard-coded in or it is
// available from the cursor.
//
// Additional fields may be present at the root of the object
// and if the program tolerates it, the cursor value may be
// absent. Only the cursor is persisted over restarts, but
// all fields in state are retained between iterations of
// the processing loop except for the produced events array,
// see discussion below.
//
// If the cursor is present the program should perform and
// process requests based on its value. If cursor is not
// present the program must have alternative logic to
// determine what requests to make.
//
// In addition to this and the functions and globals available
// from mito/lib, a global, useragent, is available to use
// in requests.
err = periodically(ctx, cfg.Interval, func() error {
log.Info("process repeated request")
var (
budget = *cfg.MaxExecutions
waitUntil time.Time
)
// Keep track of whether CEL is degraded for this periodic run.
var isDegraded bool
if doCov {
defer func() {
// If doCov is true, log the updated coverage details.
// Updates are a running aggregate for each call to run
// as cov is shared via the program compilation.
log.Debugw("coverage", "details", cov.Details())
}()
}
for {
if wait := time.Until(waitUntil); wait > 0 {
// We have a special-case wait for when we have a zero limit.
// x/time/rate allow a burst through even when the limit is zero
// so in order to ensure that we don't try until we are out of
// purgatory we calculate how long we should wait according to
// the retry after for a 429 and rate limit headers if we have
// a zero rate quota. See handleResponse below.
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(wait):
}
} else if err = ctx.Err(); err != nil {
// Otherwise exit if we have been cancelled.
return err
}
// Process a set of event requests.
if trace != nil {
log.Debugw("previous transaction", "transaction.id", trace.TxID())
}
log.Debugw("request state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact})
metrics.executions.Add(1)
start := i.now().In(time.UTC)
state, err = evalWith(ctx, prg, ast, state, start, wantDump)
log.Debugw("response state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact})
if err != nil {
var dump dumpError
switch {
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
return err
case errors.As(err, &dump):
path := strings.ReplaceAll(cfg.FailureDump.Filename, "*", sanitizeFileName(env.IDWithoutName))
dir := filepath.Dir(path)
base := filepath.Base(path)
ext := filepath.Ext(base)
prefix := strings.TrimSuffix(base, ext)
path = filepath.Join(dir, prefix+"-"+i.now().In(time.UTC).Format("2006-01-02T15-04-05.000")+ext)
log.Debugw("writing failure dump file", "path", path)
err := dump.writeToFile(path)
if err != nil {
log.Errorw("failed to write failure dump", "path", path, "error", err)
}
}
log.Errorw("failed evaluation", "error", err)
env.UpdateStatus(status.Degraded, "failed evaluation: "+err.Error())
}
isDegraded = err != nil
metrics.celProcessingTime.Update(time.Since(start).Nanoseconds())
if trace != nil {
log.Debugw("final transaction", "transaction.id", trace.TxID())
}
// On exit, state is expected to be in the shape:
//
// {
// "cursor": [
// {...},
// ...
// ],
// "events": [
// {...},
// ...
// ],
// "url": <resource address>,
// "status_code": <HTTP request status code if a network request>,
// "header": <HTTP response headers if a network request>,
// "rate_limit": <HTTP rate limit map if required by API>,
// "want_more": bool
// }
//
// The "events" array must be present, but may be empty or null.
// In the case of an error condition in the CEL program it is
// acceptable to return a single object which will be wrapped as
// an array below. It is the responsibility of the downstream
// processor to handle this object correctly (which may be to drop
// the event). The error event will also be logged.
// If it is not empty, it must only have objects as elements.
// Additional fields may be present at the root of the object.
// The evaluation is repeated with the new state, after removing
// the events field, if the "want_more" field is present and true
// and a non-zero events array is returned.
//
// If cursor is present it must be either a single object or an
// array with the same length as events; each element i of the
// cursor array will be the details for obtaining the events at or
// beyond event i in events. If the cursor is a single object it
// is will be the details for obtaining events after the last
// event in the events array and will only be retained on
// successful publication of all the events in the array.
//
// If rate_limit is present it should be a map with numeric fields
// rate and burst. It may also have a string error field and
// other fields which will be logged. If it has an error field
// the rate and burst will not be used to set rate limit behaviour.
//
// The status code and rate_limit values may be omitted if they do
// not contribute to control.
//
// The following details how a cursor array works:
//
// Result after request resulting in 5 events. Each c obtained with
// an e points to the ~next e.
//
// +----+ +----+ +----+
// | e1 | | c1 | | e1 |
// +----+ +----+ +----+ +----+
// | e2 | | c2 | | e2 | < | c1 |
// +----+ +----+ +----+ +----+
// | e3 | | c3 | | e3 | < | c2 |
// +----+ +----+ => +----+ +----+
// | e4 | | c4 | | e4 | < | c3 |
// +----+ +----+ +----+ +----+
// | e5 | | c5 | | e5 | < | c4 |
// +----+ +----+ +----+ +----+
// |next| < | c5 |
// +----+ +----+
//
// After a successful publication this will leave a single c and
// and empty events array. So the next evaluation has a boot.
//
// If the publication fails or execution is terminated at some
// point during the events array, we may end up with, e.g.
//
// +----+ +----+ +----+ +----+
// | e3 | | c3 | |next| < | c3 |
// +----+ +----+ +----+ +----+
// | e4 | | c4 | =>
// +----+ +----+ lost events
// | e5 | | c5 |
// +----+ +----+
//
// At this point, the c3 cursor (or at worst the c2 cursor) has
// been stored and we can continue from that point, recovering
// the lost events and potentially re-requesting e3.
var ok bool
ok, waitUntil, err = handleResponse(log, state, limiter)
if err != nil {
return err
}
if !ok {
continue
}
_, ok = state["url"]
if !ok && goodURL != "" {
state["url"] = goodURL
log.Debugw("adding missing url from last valid value: state did not contain a url", "last_valid_url", goodURL)
}
e, ok := state["events"]
if !ok {
return errors.New("unexpected missing events array from evaluation")
}
var events []interface{}
switch e := e.(type) {
case []interface{}:
if len(e) == 0 {
return nil
}
events = e
case map[string]interface{}:
if e == nil {
return nil
}
log.Errorw("single event object returned by evaluation", "event", e)
if err, ok := e["error"]; ok {
env.UpdateStatus(status.Degraded, fmt.Sprintf("single event error object returned by evaluation: %s", mapstr.M{"error": err}))
} else {
env.UpdateStatus(status.Degraded, "single event object returned by evaluation")
}
isDegraded = true
events = []interface{}{e}
// Make sure the cursor is not updated.
delete(state, "cursor")
default:
return fmt.Errorf("unexpected type returned for evaluation events: %T", e)
}
// We have a non-empty batch of events to process.
metrics.batchesReceived.Add(1)
metrics.eventsReceived.Add(uint64(len(events)))
// Drop events from state. If we fail during the publication,
// we will re-request these events.
delete(state, "events")
// Get cursors if they exist.
var (
cursors []interface{}
singleCursor bool
)
if c, ok := state["cursor"]; ok {
cursors, ok = c.([]interface{})
if ok {
if len(cursors) != len(events) {
log.Errorw("unexpected cursor list length", "cursors", len(cursors), "events", len(events))
env.UpdateStatus(status.Degraded, "unexpected cursor list length")
isDegraded = true
// But try to continue.
if len(cursors) < len(events) {
cursors = nil
}
}
} else {
cursors = []interface{}{c}
singleCursor = true
}
}
// Drop old cursor from state. This will be replaced with
// the current cursor object below; it is an array now.
delete(state, "cursor")
start = time.Now()
var hadPublicationError bool
for i, e := range events {
event, ok := e.(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected type returned for evaluation events: %T", e)
}
var pubCursor interface{}
if cursors != nil {
if singleCursor {
// Only set the cursor for publication at the last event
// when a single cursor object has been provided.
if i == len(events)-1 {
goodCursor = cursor
cursor, ok = cursors[0].(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected type returned for evaluation cursor element: %T", cursors[0])
}
pubCursor = cursor
}
} else {
goodCursor = cursor
cursor, ok = cursors[i].(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected type returned for evaluation cursor element: %T", cursors[i])
}
pubCursor = cursor
}
}
err = pub.Publish(beat.Event{
Timestamp: time.Now(),
Fields: event,
}, pubCursor)
if err != nil {
hadPublicationError = true
log.Errorw("error publishing event", "error", err)
env.UpdateStatus(status.Degraded, "error publishing event: "+err.Error())
isDegraded = true
cursors = nil // We are lost, so retry with this event's cursor,
continue // but continue with the events that we have without
// advancing the cursor. This allows us to potentially publish the
// events we have now, with a fallback to the last guaranteed
// correctly published cursor.
}
if i == 0 {
metrics.batchesPublished.Add(1)
}
metrics.eventsPublished.Add(1)
err = ctx.Err()
if err != nil {
return err
}
}
if !isDegraded {
env.UpdateStatus(status.Running, "")
}
metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())
// Advance the cursor to the final state if there was no error during
// publications. This is needed to transition to the next set of events.
if !hadPublicationError {
goodCursor = cursor
}
// Replace the last known good cursor.
state["cursor"] = goodCursor
if more, _ := state["want_more"].(bool); !more {
return nil
}
// Check we have a remaining execution budget.
budget--
if budget <= 0 {
log.Warnw("exceeding maximum number of CEL executions", "limit", *cfg.MaxExecutions)
env.UpdateStatus(status.Degraded, "exceeding maximum number of CEL executions")
return nil
}
}
})
switch {
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
log.Infof("input stopped because context was cancelled with: %v", err)
err = nil
}
return err
}
func periodically(ctx context.Context, each time.Duration, fn func() error) error {
err := fn()
if err != nil {
return err
}
return timed.Periodic(ctx, each, fn)
}
// handleResponse checks the response status code and handles rate limit changes.
// It returns ok=true if the response is valid, otherwise false for a retry.
func handleResponse(log *logp.Logger, state map[string]interface{}, limiter *rate.Limiter) (ok bool, waitUntil time.Time, err error) {
var header http.Header
h, ok := state["header"]
if ok {
delete(state, "header")
switch h := h.(type) {
case http.Header:
header = h
case map[string][]string:
header = h
case map[string]interface{}:
header = make(http.Header)
for k, v := range h {
switch v := v.(type) {
case []string:
header[k] = v
case []interface{}:
vals := make([]string, len(v))
for i, e := range v {
vals[i], ok = e.(string)
if !ok {
return false, time.Time{}, fmt.Errorf("unexpected type returned for response header value: %T", v)
}
}
header[k] = vals
default:
return false, waitUntil, fmt.Errorf("unexpected type returned for response header value set: %T", v)
}
}
default:
return false, waitUntil, fmt.Errorf("unexpected type returned for response header: %T", h)
}
}
r, ok := state["rate_limit"]
if ok {
delete(state, "rate_limit")
switch r := r.(type) {
case map[string]interface{}:
// The state of rate-limit headers is a disaster. This needs to be
// more robust, but there is no real consensus and the RFC is not
// past draft yet. The draft is more sane than what we have now, but
// still has a lot of complexity. Note that the RFC draft says that
// this behaviour should be in the common path, not just in the 429
// path.
waitUntil = handleRateLimit(log, r, header, limiter)
default:
return false, waitUntil, fmt.Errorf("unexpected type returned for response header: %T", h)
}
}
sc, ok := state["status_code"]
if ok {
delete(state, "status_code")
var statusCode int
switch sc := sc.(type) {
case int:
statusCode = sc
case int64:
statusCode = int(sc)
case float64:
statusCode = int(sc)
default:
return false, waitUntil, fmt.Errorf("unexpected type returned for request status code: %T", sc)
}
switch statusCode {
case http.StatusOK:
return true, time.Time{}, nil
case http.StatusTooManyRequests:
// https://datatracker.ietf.org/doc/html/rfc6585#page-3
retry := header.Get("Retry-After")
if d, err := strconv.Atoi(retry); err == nil {
t := time.Now().Add(time.Duration(d) * time.Second)
if t.After(waitUntil) {
waitUntil = t
}
} else if t, err := time.Parse(http.TimeFormat, retry); err == nil {
if t.After(waitUntil) {
waitUntil = t
}
}
return false, waitUntil, nil
default:
status := http.StatusText(statusCode)
if status == "" {
status = "unknown status code"
}
state["events"] = errorMessage(fmt.Sprintf("failed http request with %s: %d", status, statusCode))
return true, time.Time{}, nil
}
}
return true, waitUntil, nil
}
func handleRateLimit(log *logp.Logger, rateLimit map[string]interface{}, header http.Header, limiter *rate.Limiter) (waitUntil time.Time) {
if e, ok := rateLimit["error"]; ok {
// The error field should be a string, but we won't quibble here.
log.Errorw("rate limit error", "error", e, "rate_limit", mapstr.M(rateLimit), "header", header)
return waitUntil
}
limit, ok := getLimit("rate", rateLimit, log)
if !ok {
return waitUntil
}
var burst int
b, ok := rateLimit["burst"]
if !ok {
log.Warnw("rate limit missing burst", "rate_limit", mapstr.M(rateLimit))
}
switch b := b.(type) {
case int:
burst = b
case int64:
burst = int(b)
case float64:
burst = int(b)
default:
log.Errorw("unexpected type returned for rate limit burst", "type", fmt.Sprintf("%T", b), "rate_limit", mapstr.M(rateLimit))
}
if burst < 1 {
// Make sure we can make at least one new request, even if we fail
// to get a non-zero rate.Limit. We could set to zero for the case
// that limit=rate.Inf, but that detail is not important.
burst = 1
}
// Process reset if we need to wait until reset to avoid a request against a zero quota.
if limit <= 0 {
w, ok := rateLimit["reset"]
if ok {
switch w := w.(type) {
case time.Time:
waitUntil = w
next, ok := getLimit("next", rateLimit, log)
if !ok {
return waitUntil
}
limiter.SetLimitAt(waitUntil, next)
limiter.SetBurstAt(waitUntil, burst)
case string:
t, err := time.Parse(time.RFC3339, w)
if err != nil {
log.Errorw("unexpected value returned for rate limit reset", "value", w, "rate_limit", mapstr.M(rateLimit))
return waitUntil
}
waitUntil = t
next, ok := getLimit("next", rateLimit, log)
if !ok {
return waitUntil
}
limiter.SetLimitAt(waitUntil, next)
limiter.SetBurstAt(waitUntil, burst)
default:
log.Errorw("unexpected type returned for rate limit reset", "type", reflect.TypeOf(w).String(), "rate_limit", mapstr.M(rateLimit))
}
}
return waitUntil
}
limiter.SetLimit(limit)
limiter.SetBurst(burst)
return waitUntil
}
func getLimit(which string, rateLimit map[string]interface{}, log *logp.Logger) (limit rate.Limit, ok bool) {
r, ok := rateLimit[which]
if !ok {
log.Errorw("rate limit missing "+which, "rate_limit", mapstr.M(rateLimit))
return limit, false
}
switch r := r.(type) {
case rate.Limit:
limit = r
case int:
limit = rate.Limit(r)
case int64:
limit = rate.Limit(r)
case float64:
limit = rate.Limit(r)
case string:
if !strings.EqualFold(strings.TrimPrefix(r, "+"), "inf") && !strings.EqualFold(strings.TrimPrefix(r, "+"), "infinity") {
log.Errorw("unexpected value returned for rate limit "+which, "value", r, "rate_limit", mapstr.M(rateLimit))
return limit, false
}
limit = rate.Inf
default:
log.Errorw("unexpected type returned for rate limit "+which, "type", reflect.TypeOf(r).String(), "rate_limit", mapstr.M(rateLimit))
}
return limit, true
}
// lumberjackTimestamp is a glob expression matching the time format string used
// by lumberjack when rolling over logs, "2006-01-02T15-04-05.000".
// https://github.com/natefinch/lumberjack/blob/4cb27fcfbb0f35cb48c542c5ea80b7c1d18933d0/lumberjack.go#L39
const lumberjackTimestamp = "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]-[0-9][0-9]-[0-9][0-9].[0-9][0-9][0-9]"
func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitoring.Registry) (*http.Client, *httplog.LoggingRoundTripper, error) {
c, err := cfg.Resource.Transport.Client(clientOptions(cfg.Resource.URL.URL, cfg.Resource.KeepAlive.settings())...)
if err != nil {
return nil, nil, err
}
if cfg.Auth.Digest.isEnabled() {
var noReuse bool
if cfg.Auth.Digest.NoReuse != nil {
noReuse = *cfg.Auth.Digest.NoReuse
}
c.Transport = &digest.Transport{
Transport: c.Transport,
Username: cfg.Auth.Digest.User,
Password: cfg.Auth.Digest.Password,
NoReuse: noReuse,
}
}
var trace *httplog.LoggingRoundTripper
if cfg.Resource.Tracer.enabled() {
w := zapcore.AddSync(cfg.Resource.Tracer)
go func() {
// Close the logger when we are done.
<-ctx.Done()
cfg.Resource.Tracer.Close()
}()
core := ecszap.NewCore(
ecszap.NewDefaultEncoderConfig(),
w,
zap.DebugLevel,
)
traceLogger := zap.New(core)
maxBodyLen := cfg.Resource.Tracer.MaxSize * 1e6 / 10 // 10% of file max
trace = httplog.NewLoggingRoundTripper(c.Transport, traceLogger, maxBodyLen, log)
c.Transport = trace
} else if cfg.Resource.Tracer != nil {
// We have a trace log name, but we are not enabled,
// so remove all trace logs we own.
err = os.Remove(cfg.Resource.Tracer.Filename)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Errorw("failed to remove request trace log", "path", cfg.Resource.Tracer.Filename, "error", err)
}
ext := filepath.Ext(cfg.Resource.Tracer.Filename)
base := strings.TrimSuffix(cfg.Resource.Tracer.Filename, ext)
paths, err := filepath.Glob(base + "-" + lumberjackTimestamp + ext)
if err != nil {
log.Errorw("failed to collect request trace log path names", "error", err)
}
for _, p := range paths {
err = os.Remove(p)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Errorw("failed to remove request trace log", "path", p, "error", err)
}
}
}
if !cfg.FailureDump.enabled() && cfg.FailureDump != nil && cfg.FailureDump.Filename != "" {
// We have a fail-dump name, but we are not enabled,
// so remove all dumps we own.
err = os.Remove(cfg.FailureDump.Filename)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Errorw("failed to remove request trace log", "path", cfg.FailureDump.Filename, "error", err)
}
ext := filepath.Ext(cfg.FailureDump.Filename)
base := strings.TrimSuffix(cfg.FailureDump.Filename, ext)
paths, err := filepath.Glob(base + "-" + lumberjackTimestamp + ext)
if err != nil {
log.Errorw("failed to collect request trace log path names", "error", err)
}
for _, p := range paths {
err = os.Remove(p)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Errorw("failed to remove request trace log", "path", p, "error", err)
}
}
}
if reg != nil {
c.Transport = httpmon.NewMetricsRoundTripper(c.Transport, reg)
}
c.CheckRedirect = checkRedirect(cfg.Resource, log)
if cfg.Resource.Retry.getMaxAttempts() > 1 {
maxAttempts := cfg.Resource.Retry.getMaxAttempts()
c = (&retryablehttp.Client{
HTTPClient: c,
Logger: newRetryLog(log),
RetryWaitMin: cfg.Resource.Retry.getWaitMin(),
RetryWaitMax: cfg.Resource.Retry.getWaitMax(),
RetryMax: maxAttempts,
CheckRetry: retryablehttp.DefaultRetryPolicy,
Backoff: retryablehttp.DefaultBackoff,
ErrorHandler: retryErrorHandler(maxAttempts, log),
}).StandardClient()
}
if cfg.Auth.OAuth2.isEnabled() {
authClient, err := cfg.Auth.OAuth2.client(ctx, c)
if err != nil {
return nil, nil, err
}
return authClient, trace, nil
}
c.Transport = userAgentDecorator{
UserAgent: userAgent,
Transport: c.Transport,
}
return c, trace, nil
}
func wantClient(cfg config) bool {
switch scheme, _, _ := strings.Cut(cfg.Resource.URL.Scheme, "+"); scheme {
case "http", "https":
return true
default:
return false
}
}
// clientOption returns constructed client configuration options, including
// setting up http+unix and http+npipe transports if requested.
func clientOptions(u *url.URL, keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption {
scheme, trans, ok := strings.Cut(u.Scheme, "+")
var dialer transport.Dialer
switch {
default:
fallthrough
case !ok:
return []httpcommon.TransportOption{
httpcommon.WithAPMHTTPInstrumentation(),
keepalive,
}
// We set the host for the unix socket and Windows named
// pipes schemes because the http.Transport expects to
// have a host and will error out if it is not present.
// The values here are just non-zero with a helpful name.
// They are not used in any logic.
case trans == "unix":
u.Host = "unix-socket"
dialer = socketDialer{u.Path}
case trans == "npipe":
u.Host = "windows-npipe"
dialer = npipeDialer{u.Path}
}
u.Scheme = scheme
return []httpcommon.TransportOption{
httpcommon.WithAPMHTTPInstrumentation(),
keepalive,
httpcommon.WithBaseDialer(dialer),
}
}
// socketDialer implements transport.Dialer to a constant socket path.
type socketDialer struct {
path string
}
func (d socketDialer) Dial(_, _ string) (net.Conn, error) {
return net.Dial("unix", d.path)
}
func (d socketDialer) DialContext(ctx context.Context, _, _ string) (net.Conn, error) {
var nd net.Dialer
return nd.DialContext(ctx, "unix", d.path)
}
func checkRedirect(cfg *ResourceConfig, log *logp.Logger) func(*http.Request, []*http.Request) error {
return func(req *http.Request, via []*http.Request) error {
log.Debug("http client: checking redirect")
if len(via) >= cfg.RedirectMaxRedirects {
log.Debug("http client: max redirects exceeded")
return fmt.Errorf("stopped after %d redirects", cfg.RedirectMaxRedirects)
}
if !cfg.RedirectForwardHeaders || len(via) == 0 {
log.Debugf("http client: nothing to do while checking redirects - forward_headers: %v, via: %#v", cfg.RedirectForwardHeaders, via)
return nil
}
prev := via[len(via)-1] // previous request to get headers from
log.Debugf("http client: forwarding headers from previous request: %#v", prev.Header)
req.Header = prev.Header.Clone()
for _, k := range cfg.RedirectHeadersBanList {
log.Debugf("http client: ban header %v", k)
req.Header.Del(k)
}
return nil
}
}
// retryErrorHandler returns a retryablehttp.ErrorHandler that will log retry resignation
// but return the last retry attempt's response and a nil error so that the CEL code
// can evaluate the response status itself. Any error passed to the retryablehttp.ErrorHandler
// is returned unaltered. Despite not being documented so, the error handler may be passed
// a nil resp. retryErrorHandler will handle this case.
func retryErrorHandler(max int, log *logp.Logger) retryablehttp.ErrorHandler {
return func(resp *http.Response, err error, numTries int) (*http.Response, error) {
if resp != nil && resp.Request != nil {
reqURL := "unavailable"
if resp.Request.URL != nil {
reqURL = resp.Request.URL.String()
}
log.Warnw("giving up retries", "method", resp.Request.Method, "url", reqURL, "retries", max+1)
} else {
log.Warnw("giving up retries: no response available", "retries", max+1)
}
return resp, err
}
}
type userAgentDecorator struct {
UserAgent string
Transport http.RoundTripper
}
func (t userAgentDecorator) RoundTrip(r *http.Request) (*http.Response, error) {
if _, ok := r.Header["User-Agent"]; !ok {
r.Header.Set("User-Agent", t.UserAgent)
}
return t.Transport.RoundTrip(r)
}
func newRateLimiterFromConfig(cfg *ResourceConfig) *rate.Limiter {
r := rate.Inf
b := 1
if cfg != nil && cfg.RateLimit != nil {
if cfg.RateLimit.Limit != nil {
r = rate.Limit(*cfg.RateLimit.Limit)
}
if cfg.RateLimit.Burst != nil {
b = *cfg.RateLimit.Burst
}
}
return rate.NewLimiter(r, b)
}
func regexpsFromConfig(cfg config) (map[string]*regexp.Regexp, error) {
if len(cfg.Regexps) == 0 {
return nil, nil
}
patterns := make(map[string]*regexp.Regexp)
for name, expr := range cfg.Regexps {
var err error
patterns[name], err = regexp.Compile(expr)
if err != nil {
return nil, err
}
}
return patterns, nil
}
var (
// mimetypes holds supported MIME type mappings.
mimetypes = map[string]interface{}{
"application/gzip": func(r io.Reader) (io.Reader, error) { return gzip.NewReader(r) },
"application/x-ndjson": lib.NDJSON,
"application/zip": lib.Zip,
"text/csv; header=absent": lib.CSVNoHeader,
"text/csv; header=present": lib.CSVHeader,
// Include the undocumented space-less syntax to head off typo-related
// user issues.
//
// TODO: Consider changing the MIME type look-ups to a formal parser
// rather than a simple map look-up.
"text/csv;header=absent": lib.CSVNoHeader,
"text/csv;header=present": lib.CSVHeader,
}
// limitPolicies are the provided rate limit policy helpers.
limitPolicies = map[string]lib.LimitPolicy{
"okta": lib.OktaRateLimit,
"draft": lib.DraftRateLimit,
}
)
func getEnv(allowed []string) map[string]string {
env := make(map[string]string)
for _, kv := range os.Environ() {
k, v, ok := strings.Cut(kv, "=")
if !ok || !slices.Contains(allowed, k) {
continue
}
env[k] = v
}
return env
}
func newProgram(ctx context.Context, src, root string, vars map[string]string, client *http.Client, httpOptions lib.HTTPOptions, patterns map[string]*regexp.Regexp, xsd map[string]string, log *logp.Logger, trace *httplog.LoggingRoundTripper, details, coverage bool) (cel.Program, *cel.Ast, *lib.Coverage, error) {
xml, err := lib.XML(nil, xsd)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to build xml type hints: %w", err)
}
opts := []cel.EnvOption{
cel.Declarations(decls.NewVar(root, decls.Dyn)),
cel.OptionalTypes(cel.OptionalTypesVersion(lib.OptionalTypesVersion)),
lib.Collections(),
lib.Crypto(),
lib.JSON(nil),
xml,
lib.Printf(),
lib.Strings(),
lib.Time(),
lib.Try(),
lib.Debug(debug(log, trace)),
lib.File(mimetypes),
lib.MIME(mimetypes),
lib.HTTPWithContextOpts(ctx, client, httpOptions),
lib.Limit(limitPolicies),
lib.Globals(map[string]interface{}{
"useragent": userAgent,
"env": vars,
}),
}
if len(patterns) != 0 {
opts = append(opts, lib.Regexp(patterns))
}
env, err := cel.NewEnv(opts...)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create env: %w", err)
}
ast, iss := env.Compile(src)
if iss.Err() != nil {
return nil, nil, nil, fmt.Errorf("failed compilation: %w", iss.Err())
}
var (
progOpts []cel.ProgramOption
cov *lib.Coverage
)
if coverage {
cov = lib.NewCoverage(ast)
progOpts = []cel.ProgramOption{cov.ProgramOption()}
}
if details {
progOpts = []cel.ProgramOption{cel.EvalOptions(cel.OptTrackState)}
}
prg, err := env.Program(ast, progOpts...)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed program instantiation: %w", err)
}
return prg, ast, cov, nil
}
func debug(log *logp.Logger, trace *httplog.LoggingRoundTripper) func(string, any) {
log = log.Named("cel_debug")
return func(tag string, value any) {
level := "DEBUG"
if _, ok := value.(error); ok {
level = "ERROR"
}
if trace == nil {
log.Debugw(level, "tag", tag, "value", value)
} else {
log.Debugw(level, "tag", tag, "value", value, "transaction.id", trace.TxID())
}
}
}
func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[string]interface{}, now time.Time, details bool) (map[string]interface{}, error) {
out, det, err := prg.ContextEval(ctx, map[string]interface{}{
// Replace global program "now" with current time. This is necessary
// as the lib.Time now global is static at program instantiation time
// which will persist over multiple evaluations. The lib.Time behaviour
// is correct for mito where CEL program instances live for only a
// single evaluation. Rather than incurring the cost of creating a new
// cel.Program for each evaluation, shadow lib.Time's now with a new
// value for each eval. We retain the lib.Time now global for
// compatibility between CEL programs developed in mito with programs
// run in the input.
"now": now,
root: state,
})
if err != nil {
err = lib.DecoratedError{AST: ast, Err: err}
if details {
err = dumpError{error: err, dump: lib.NewDump(ast, det)}
}
}
if e := ctx.Err(); e != nil {
err = e
}
if err != nil {
state["events"] = errorMessage(fmt.Sprintf("failed eval: %v", err))
clearWantMore(state)
return state, fmt.Errorf("failed eval: %w", err)
}
v, err := out.ConvertToNative(reflect.TypeOf((*structpb.Struct)(nil)))
if err != nil {
state["events"] = errorMessage(fmt.Sprintf("failed proto conversion: %v", err))
clearWantMore(state)
return state, fmt.Errorf("failed proto conversion: %w", err)
}
switch v := v.(type) {
case *structpb.Struct:
return v.AsMap(), nil
default:
// This should never happen.
errMsg := fmt.Sprintf("unexpected native conversion type: %T", v)
state["events"] = errorMessage(errMsg)
clearWantMore(state)
return state, errors.New(errMsg)
}
}
// dumpError is an evaluation state dump associated with an error.
type dumpError struct {
error
dump *lib.Dump
}
func (e dumpError) writeToFile(path string) (err error) {
err = os.MkdirAll(filepath.Dir(path), 0o700)
if err != nil {
return err
}
f, err := os.Create(path)
if err != nil {
return err
}
defer func() {
err = errors.Join(err, f.Sync(), f.Close())
}()
enc := json.NewEncoder(f)
enc.SetEscapeHTML(false)
type dump struct {
Error string `json:"error"`
State []lib.NodeValue `json:"state"`
}
return enc.Encode(dump{
Error: e.Error(),
State: e.dump.NodeValues(),
})
}
// clearWantMore sets the state to not request additional work in a periodic evaluation.
// It leaves state intact if there is no "want_more" element, and sets the element to false
// if there is. This is necessary instead of just doing delete(state, "want_more") as
// client CEL code may expect the want_more field to be present.
func clearWantMore(state map[string]interface{}) {
if _, ok := state["want_more"]; ok {
state["want_more"] = false
}
}
func errorMessage(msg string) map[string]interface{} {
return map[string]interface{}{"error": map[string]interface{}{"message": msg}}
}
// retryLog is a shim for the retryablehttp.Client.Logger.
type retryLog struct{ log *logp.Logger }
func newRetryLog(log *logp.Logger) *retryLog {
return &retryLog{log: log.Named("retryablehttp").WithOptions(zap.AddCallerSkip(1))}
}
func (l *retryLog) Error(msg string, kv ...interface{}) { l.log.Errorw(msg, kv...) }
func (l *retryLog) Info(msg string, kv ...interface{}) { l.log.Infow(msg, kv...) }
func (l *retryLog) Debug(msg string, kv ...interface{}) { l.log.Debugw(msg, kv...) }
func (l *retryLog) Warn(msg string, kv ...interface{}) { l.log.Warnw(msg, kv...) }
func test(url *url.URL) error {
port := func() string {
if url.Port() != "" {
return url.Port()
}
switch url.Scheme {
case "https":
return "443"
}
return "80"
}()
_, err := net.DialTimeout("tcp", net.JoinHostPort(url.Hostname(), port), time.Second)
if err != nil {
return fmt.Errorf("url %q is unreachable: %w", url, err)
}
return nil
}
// inputMetrics handles the input's metric reporting.
type inputMetrics struct {
unregister func()
resource *monitoring.String // URL-ish of input resource
executions *monitoring.Uint // times the CEL program has been executed
batchesReceived *monitoring.Uint // number of event arrays received
eventsReceived *monitoring.Uint // number of events received
batchesPublished *monitoring.Uint // number of event arrays published
eventsPublished *monitoring.Uint // number of events published
celProcessingTime metrics.Sample // histogram of the elapsed successful cel program processing times in nanoseconds
batchProcessingTime metrics.Sample // histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches).
}
func newInputMetrics(id string) (*inputMetrics, *monitoring.Registry) {
reg, unreg := inputmon.NewInputRegistry(inputName, id, nil)
out := &inputMetrics{
unregister: unreg,
resource: monitoring.NewString(reg, "resource"),
executions: monitoring.NewUint(reg, "cel_executions"),
batchesReceived: monitoring.NewUint(reg, "batches_received_total"),
eventsReceived: monitoring.NewUint(reg, "events_received_total"),
batchesPublished: monitoring.NewUint(reg, "batches_published_total"),
eventsPublished: monitoring.NewUint(reg, "events_published_total"),
celProcessingTime: metrics.NewUniformSample(1024),
batchProcessingTime: metrics.NewUniformSample(1024),
}
_ = adapter.NewGoMetrics(reg, "cel_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.celProcessingTime))
_ = adapter.NewGoMetrics(reg, "batch_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.batchProcessingTime))
return out, reg
}
func (m *inputMetrics) Close() {
m.unregister()
}
// redactor implements lazy field redaction of sets of a mapstr.M.
type redactor struct {
state mapstr.M
cfg *redact
}
// String renders the JSON corresponding to r.state after applying redaction
// operations.
func (r redactor) String() string {
if r.cfg == nil || len(r.cfg.Fields) == 0 {
return r.state.String()
}
c := make(mapstr.M, len(r.state))
cloneMap(c, r.state)
for _, mask := range r.cfg.Fields {
if r.cfg.Delete {
walkMap(c, mask, func(parent mapstr.M, key string) {
delete(parent, key)
})
continue
}
walkMap(c, mask, func(parent mapstr.M, key string) {
parent[key] = "*"
})
}
return c.String()
}
// cloneMap is an enhanced version of mapstr.M.Clone that handles cloning arrays
// within objects. Nested arrays are not handled.
func cloneMap(dst, src mapstr.M) {
for k, v := range src {
switch v := v.(type) {
case mapstr.M:
d := make(mapstr.M, len(v))
dst[k] = d
cloneMap(d, v)
case map[string]interface{}:
d := make(map[string]interface{}, len(v))
dst[k] = d
cloneMap(d, v)
case []mapstr.M:
a := make([]mapstr.M, 0, len(v))
for _, m := range v {
d := make(mapstr.M, len(m))
cloneMap(d, m)
a = append(a, d)
}
dst[k] = a
case []map[string]interface{}:
a := make([]map[string]interface{}, 0, len(v))
for _, m := range v {
d := make(map[string]interface{}, len(m))
cloneMap(d, m)
a = append(a, d)
}
dst[k] = a
default:
dst[k] = v
}
}
}
// walkMap walks to all ends of the provided path in m and applies fn to the
// final element of each walk. Nested arrays are not handled.
func walkMap(m mapstr.M, path string, fn func(parent mapstr.M, key string)) {
key, rest, more := strings.Cut(path, ".")
v, ok := m[key]
if !ok {
return
}
if !more {
fn(m, key)
return
}
switch v := v.(type) {
case mapstr.M:
walkMap(v, rest, fn)
case map[string]interface{}:
walkMap(v, rest, fn)
case []mapstr.M:
for _, m := range v {
walkMap(m, rest, fn)
}
case []map[string]interface{}:
for _, m := range v {
walkMap(m, rest, fn)
}
case []interface{}:
for _, v := range v {
switch m := v.(type) {
case mapstr.M:
walkMap(m, rest, fn)
case map[string]interface{}:
walkMap(m, rest, fn)
}
}
}
}