in x-pack/filebeat/input/cel/input.go [138:551]
func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, pub inputcursor.Publisher) error {
cfg := src.cfg
log := env.Logger.With("input_url", cfg.Resource.URL)
metrics, reg := newInputMetrics(env.ID)
defer metrics.Close()
ctx := ctxtool.FromCanceller(env.Cancelation)
if cfg.Resource.Tracer != nil {
id := sanitizeFileName(env.IDWithoutName)
cfg.Resource.Tracer.Filename = strings.ReplaceAll(cfg.Resource.Tracer.Filename, "*", id)
}
client, trace, err := newClient(ctx, cfg, log, reg)
if err != nil {
return err
}
limiter := newRateLimiterFromConfig(cfg.Resource)
patterns, err := regexpsFromConfig(cfg)
if err != nil {
return err
}
var auth *lib.BasicAuth
if cfg.Auth.Basic.isEnabled() {
auth = &lib.BasicAuth{
Username: cfg.Auth.Basic.User,
Password: cfg.Auth.Basic.Password,
}
}
wantDump := cfg.FailureDump.enabled() && cfg.FailureDump.Filename != ""
doCov := cfg.RecordCoverage && log.IsDebug()
httpOptions := lib.HTTPOptions{
Limiter: limiter,
BasicAuth: auth,
Headers: cfg.Resource.Headers,
MaxBodySize: cfg.Resource.MaxBodySize,
}
prg, ast, cov, err := newProgram(ctx, cfg.Program, root, getEnv(cfg.AllowedEnvironment), client, httpOptions, patterns, cfg.XSDs, log, trace, wantDump, doCov)
if err != nil {
return err
}
var state map[string]interface{}
if cfg.State == nil {
state = make(map[string]interface{})
} else {
state = cfg.State
}
if cursor != nil {
state["cursor"] = cursor
}
goodCursor := cursor
goodURL := cfg.Resource.URL.String()
state["url"] = goodURL
metrics.resource.Set(goodURL)
env.UpdateStatus(status.Running, "")
// On entry, state is expected to be in the shape:
//
// {
// "url": <resource address>,
// "cursor": { ... },
// ...
// }
//
// The url field must be present and can be an HTTP end-point
// or a file path. It is currently the responsibility of the
// program to handle removing the scheme from a file url if it
// is present. The url may be mutated during execution of the
// program but the mutated state will not be persisted between
// restarts and the url must be present in the returned value
// to ensure that it is available in the next evaluation unless
// the program has the resource address hard-coded in or it is
// available from the cursor.
//
// Additional fields may be present at the root of the object
// and if the program tolerates it, the cursor value may be
// absent. Only the cursor is persisted over restarts, but
// all fields in state are retained between iterations of
// the processing loop except for the produced events array,
// see discussion below.
//
// If the cursor is present the program should perform and
// process requests based on its value. If cursor is not
// present the program must have alternative logic to
// determine what requests to make.
//
// In addition to this and the functions and globals available
// from mito/lib, a global, useragent, is available to use
// in requests.
err = periodically(ctx, cfg.Interval, func() error {
log.Info("process repeated request")
var (
budget = *cfg.MaxExecutions
waitUntil time.Time
)
// Keep track of whether CEL is degraded for this periodic run.
var isDegraded bool
if doCov {
defer func() {
// If doCov is true, log the updated coverage details.
// Updates are a running aggregate for each call to run
// as cov is shared via the program compilation.
log.Debugw("coverage", "details", cov.Details())
}()
}
for {
if wait := time.Until(waitUntil); wait > 0 {
// We have a special-case wait for when we have a zero limit.
// x/time/rate allow a burst through even when the limit is zero
// so in order to ensure that we don't try until we are out of
// purgatory we calculate how long we should wait according to
// the retry after for a 429 and rate limit headers if we have
// a zero rate quota. See handleResponse below.
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(wait):
}
} else if err = ctx.Err(); err != nil {
// Otherwise exit if we have been cancelled.
return err
}
// Process a set of event requests.
if trace != nil {
log.Debugw("previous transaction", "transaction.id", trace.TxID())
}
log.Debugw("request state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact})
metrics.executions.Add(1)
start := i.now().In(time.UTC)
state, err = evalWith(ctx, prg, ast, state, start, wantDump)
log.Debugw("response state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact})
if err != nil {
var dump dumpError
switch {
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
return err
case errors.As(err, &dump):
path := strings.ReplaceAll(cfg.FailureDump.Filename, "*", sanitizeFileName(env.IDWithoutName))
dir := filepath.Dir(path)
base := filepath.Base(path)
ext := filepath.Ext(base)
prefix := strings.TrimSuffix(base, ext)
path = filepath.Join(dir, prefix+"-"+i.now().In(time.UTC).Format("2006-01-02T15-04-05.000")+ext)
log.Debugw("writing failure dump file", "path", path)
err := dump.writeToFile(path)
if err != nil {
log.Errorw("failed to write failure dump", "path", path, "error", err)
}
}
log.Errorw("failed evaluation", "error", err)
env.UpdateStatus(status.Degraded, "failed evaluation: "+err.Error())
}
isDegraded = err != nil
metrics.celProcessingTime.Update(time.Since(start).Nanoseconds())
if trace != nil {
log.Debugw("final transaction", "transaction.id", trace.TxID())
}
// On exit, state is expected to be in the shape:
//
// {
// "cursor": [
// {...},
// ...
// ],
// "events": [
// {...},
// ...
// ],
// "url": <resource address>,
// "status_code": <HTTP request status code if a network request>,
// "header": <HTTP response headers if a network request>,
// "rate_limit": <HTTP rate limit map if required by API>,
// "want_more": bool
// }
//
// The "events" array must be present, but may be empty or null.
// In the case of an error condition in the CEL program it is
// acceptable to return a single object which will be wrapped as
// an array below. It is the responsibility of the downstream
// processor to handle this object correctly (which may be to drop
// the event). The error event will also be logged.
// If it is not empty, it must only have objects as elements.
// Additional fields may be present at the root of the object.
// The evaluation is repeated with the new state, after removing
// the events field, if the "want_more" field is present and true
// and a non-zero events array is returned.
//
// If cursor is present it must be either a single object or an
// array with the same length as events; each element i of the
// cursor array will be the details for obtaining the events at or
// beyond event i in events. If the cursor is a single object it
// is will be the details for obtaining events after the last
// event in the events array and will only be retained on
// successful publication of all the events in the array.
//
// If rate_limit is present it should be a map with numeric fields
// rate and burst. It may also have a string error field and
// other fields which will be logged. If it has an error field
// the rate and burst will not be used to set rate limit behaviour.
//
// The status code and rate_limit values may be omitted if they do
// not contribute to control.
//
// The following details how a cursor array works:
//
// Result after request resulting in 5 events. Each c obtained with
// an e points to the ~next e.
//
// +----+ +----+ +----+
// | e1 | | c1 | | e1 |
// +----+ +----+ +----+ +----+
// | e2 | | c2 | | e2 | < | c1 |
// +----+ +----+ +----+ +----+
// | e3 | | c3 | | e3 | < | c2 |
// +----+ +----+ => +----+ +----+
// | e4 | | c4 | | e4 | < | c3 |
// +----+ +----+ +----+ +----+
// | e5 | | c5 | | e5 | < | c4 |
// +----+ +----+ +----+ +----+
// |next| < | c5 |
// +----+ +----+
//
// After a successful publication this will leave a single c and
// and empty events array. So the next evaluation has a boot.
//
// If the publication fails or execution is terminated at some
// point during the events array, we may end up with, e.g.
//
// +----+ +----+ +----+ +----+
// | e3 | | c3 | |next| < | c3 |
// +----+ +----+ +----+ +----+
// | e4 | | c4 | =>
// +----+ +----+ lost events
// | e5 | | c5 |
// +----+ +----+
//
// At this point, the c3 cursor (or at worst the c2 cursor) has
// been stored and we can continue from that point, recovering
// the lost events and potentially re-requesting e3.
var ok bool
ok, waitUntil, err = handleResponse(log, state, limiter)
if err != nil {
return err
}
if !ok {
continue
}
_, ok = state["url"]
if !ok && goodURL != "" {
state["url"] = goodURL
log.Debugw("adding missing url from last valid value: state did not contain a url", "last_valid_url", goodURL)
}
e, ok := state["events"]
if !ok {
return errors.New("unexpected missing events array from evaluation")
}
var events []interface{}
switch e := e.(type) {
case []interface{}:
if len(e) == 0 {
return nil
}
events = e
case map[string]interface{}:
if e == nil {
return nil
}
log.Errorw("single event object returned by evaluation", "event", e)
if err, ok := e["error"]; ok {
env.UpdateStatus(status.Degraded, fmt.Sprintf("single event error object returned by evaluation: %s", mapstr.M{"error": err}))
} else {
env.UpdateStatus(status.Degraded, "single event object returned by evaluation")
}
isDegraded = true
events = []interface{}{e}
// Make sure the cursor is not updated.
delete(state, "cursor")
default:
return fmt.Errorf("unexpected type returned for evaluation events: %T", e)
}
// We have a non-empty batch of events to process.
metrics.batchesReceived.Add(1)
metrics.eventsReceived.Add(uint64(len(events)))
// Drop events from state. If we fail during the publication,
// we will re-request these events.
delete(state, "events")
// Get cursors if they exist.
var (
cursors []interface{}
singleCursor bool
)
if c, ok := state["cursor"]; ok {
cursors, ok = c.([]interface{})
if ok {
if len(cursors) != len(events) {
log.Errorw("unexpected cursor list length", "cursors", len(cursors), "events", len(events))
env.UpdateStatus(status.Degraded, "unexpected cursor list length")
isDegraded = true
// But try to continue.
if len(cursors) < len(events) {
cursors = nil
}
}
} else {
cursors = []interface{}{c}
singleCursor = true
}
}
// Drop old cursor from state. This will be replaced with
// the current cursor object below; it is an array now.
delete(state, "cursor")
start = time.Now()
var hadPublicationError bool
for i, e := range events {
event, ok := e.(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected type returned for evaluation events: %T", e)
}
var pubCursor interface{}
if cursors != nil {
if singleCursor {
// Only set the cursor for publication at the last event
// when a single cursor object has been provided.
if i == len(events)-1 {
goodCursor = cursor
cursor, ok = cursors[0].(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected type returned for evaluation cursor element: %T", cursors[0])
}
pubCursor = cursor
}
} else {
goodCursor = cursor
cursor, ok = cursors[i].(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected type returned for evaluation cursor element: %T", cursors[i])
}
pubCursor = cursor
}
}
err = pub.Publish(beat.Event{
Timestamp: time.Now(),
Fields: event,
}, pubCursor)
if err != nil {
hadPublicationError = true
log.Errorw("error publishing event", "error", err)
env.UpdateStatus(status.Degraded, "error publishing event: "+err.Error())
isDegraded = true
cursors = nil // We are lost, so retry with this event's cursor,
continue // but continue with the events that we have without
// advancing the cursor. This allows us to potentially publish the
// events we have now, with a fallback to the last guaranteed
// correctly published cursor.
}
if i == 0 {
metrics.batchesPublished.Add(1)
}
metrics.eventsPublished.Add(1)
err = ctx.Err()
if err != nil {
return err
}
}
if !isDegraded {
env.UpdateStatus(status.Running, "")
}
metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())
// Advance the cursor to the final state if there was no error during
// publications. This is needed to transition to the next set of events.
if !hadPublicationError {
goodCursor = cursor
}
// Replace the last known good cursor.
state["cursor"] = goodCursor
if more, _ := state["want_more"].(bool); !more {
return nil
}
// Check we have a remaining execution budget.
budget--
if budget <= 0 {
log.Warnw("exceeding maximum number of CEL executions", "limit", *cfg.MaxExecutions)
env.UpdateStatus(status.Degraded, "exceeding maximum number of CEL executions")
return nil
}
}
})
switch {
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
log.Infof("input stopped because context was cancelled with: %v", err)
err = nil
}
return err
}