func()

in x-pack/filebeat/input/cel/input.go [138:551]


func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, pub inputcursor.Publisher) error {
	cfg := src.cfg
	log := env.Logger.With("input_url", cfg.Resource.URL)

	metrics, reg := newInputMetrics(env.ID)
	defer metrics.Close()

	ctx := ctxtool.FromCanceller(env.Cancelation)

	if cfg.Resource.Tracer != nil {
		id := sanitizeFileName(env.IDWithoutName)
		cfg.Resource.Tracer.Filename = strings.ReplaceAll(cfg.Resource.Tracer.Filename, "*", id)
	}

	client, trace, err := newClient(ctx, cfg, log, reg)
	if err != nil {
		return err
	}

	limiter := newRateLimiterFromConfig(cfg.Resource)

	patterns, err := regexpsFromConfig(cfg)
	if err != nil {
		return err
	}

	var auth *lib.BasicAuth
	if cfg.Auth.Basic.isEnabled() {
		auth = &lib.BasicAuth{
			Username: cfg.Auth.Basic.User,
			Password: cfg.Auth.Basic.Password,
		}
	}
	wantDump := cfg.FailureDump.enabled() && cfg.FailureDump.Filename != ""
	doCov := cfg.RecordCoverage && log.IsDebug()
	httpOptions := lib.HTTPOptions{
		Limiter:     limiter,
		BasicAuth:   auth,
		Headers:     cfg.Resource.Headers,
		MaxBodySize: cfg.Resource.MaxBodySize,
	}
	prg, ast, cov, err := newProgram(ctx, cfg.Program, root, getEnv(cfg.AllowedEnvironment), client, httpOptions, patterns, cfg.XSDs, log, trace, wantDump, doCov)
	if err != nil {
		return err
	}

	var state map[string]interface{}
	if cfg.State == nil {
		state = make(map[string]interface{})
	} else {
		state = cfg.State
	}
	if cursor != nil {
		state["cursor"] = cursor
	}
	goodCursor := cursor
	goodURL := cfg.Resource.URL.String()
	state["url"] = goodURL
	metrics.resource.Set(goodURL)
	env.UpdateStatus(status.Running, "")
	// On entry, state is expected to be in the shape:
	//
	// {
	//     "url": <resource address>,
	//     "cursor": { ... },
	//     ...
	// }
	//
	// The url field must be present and can be an HTTP end-point
	// or a file path. It is currently the responsibility of the
	// program to handle removing the scheme from a file url if it
	// is present. The url may be mutated during execution of the
	// program but the mutated state will not be persisted between
	// restarts and the url must be present in the returned value
	// to ensure that it is available in the next evaluation unless
	// the program has the resource address hard-coded in or it is
	// available from the cursor.
	//
	// Additional fields may be present at the root of the object
	// and if the program tolerates it, the cursor value may be
	// absent. Only the cursor is persisted over restarts, but
	// all fields in state are retained between iterations of
	// the processing loop except for the produced events array,
	// see discussion below.
	//
	// If the cursor is present the program should perform and
	// process requests based on its value. If cursor is not
	// present the program must have alternative logic to
	// determine what requests to make.
	//
	// In addition to this and the functions and globals available
	// from mito/lib, a global, useragent, is available to use
	// in requests.
	err = periodically(ctx, cfg.Interval, func() error {
		log.Info("process repeated request")
		var (
			budget    = *cfg.MaxExecutions
			waitUntil time.Time
		)
		// Keep track of whether CEL is degraded for this periodic run.
		var isDegraded bool
		if doCov {
			defer func() {
				// If doCov is true, log the updated coverage details.
				// Updates are a running aggregate for each call to run
				// as cov is shared via the program compilation.
				log.Debugw("coverage", "details", cov.Details())
			}()
		}
		for {
			if wait := time.Until(waitUntil); wait > 0 {
				// We have a special-case wait for when we have a zero limit.
				// x/time/rate allow a burst through even when the limit is zero
				// so in order to ensure that we don't try until we are out of
				// purgatory we calculate how long we should wait according to
				// the retry after for a 429 and rate limit headers if we have
				// a zero rate quota. See handleResponse below.
				select {
				case <-ctx.Done():
					return ctx.Err()
				case <-time.After(wait):
				}
			} else if err = ctx.Err(); err != nil {
				// Otherwise exit if we have been cancelled.
				return err
			}

			// Process a set of event requests.
			if trace != nil {
				log.Debugw("previous transaction", "transaction.id", trace.TxID())
			}
			log.Debugw("request state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact})
			metrics.executions.Add(1)
			start := i.now().In(time.UTC)
			state, err = evalWith(ctx, prg, ast, state, start, wantDump)
			log.Debugw("response state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact})
			if err != nil {
				var dump dumpError
				switch {
				case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
					return err
				case errors.As(err, &dump):
					path := strings.ReplaceAll(cfg.FailureDump.Filename, "*", sanitizeFileName(env.IDWithoutName))
					dir := filepath.Dir(path)
					base := filepath.Base(path)
					ext := filepath.Ext(base)
					prefix := strings.TrimSuffix(base, ext)
					path = filepath.Join(dir, prefix+"-"+i.now().In(time.UTC).Format("2006-01-02T15-04-05.000")+ext)
					log.Debugw("writing failure dump file", "path", path)
					err := dump.writeToFile(path)
					if err != nil {
						log.Errorw("failed to write failure dump", "path", path, "error", err)
					}
				}
				log.Errorw("failed evaluation", "error", err)
				env.UpdateStatus(status.Degraded, "failed evaluation: "+err.Error())
			}
			isDegraded = err != nil
			metrics.celProcessingTime.Update(time.Since(start).Nanoseconds())
			if trace != nil {
				log.Debugw("final transaction", "transaction.id", trace.TxID())
			}

			// On exit, state is expected to be in the shape:
			//
			// {
			//     "cursor": [
			//         {...},
			//         ...
			//     ],
			//     "events": [
			//         {...},
			//         ...
			//     ],
			//     "url": <resource address>,
			//     "status_code": <HTTP request status code if a network request>,
			//     "header": <HTTP response headers if a network request>,
			//     "rate_limit": <HTTP rate limit map if required by API>,
			//     "want_more": bool
			// }
			//
			// The "events" array must be present, but may be empty or null.
			// In the case of an error condition in the CEL program it is
			// acceptable to return a single object which will be wrapped as
			// an array below. It is the responsibility of the downstream
			// processor to handle this object correctly (which may be to drop
			// the event). The error event will also be logged.
			// If it is not empty, it must only have objects as elements.
			// Additional fields may be present at the root of the object.
			// The evaluation is repeated with the new state, after removing
			// the events field, if the "want_more" field is present and true
			// and a non-zero events array is returned.
			//
			// If cursor is present it must be either a single object or an
			// array with the same length as events; each element i of the
			// cursor array will be the details for obtaining the events at or
			// beyond event i in events. If the cursor is a single object it
			// is will be the details for obtaining events after the last
			// event in the events array and will only be retained on
			// successful publication of all the events in the array.
			//
			// If rate_limit is present it should be a map with numeric fields
			// rate and burst. It may also have a string error field and
			// other fields which will be logged. If it has an error field
			// the rate and burst will not be used to set rate limit behaviour.
			//
			// The status code and rate_limit values may be omitted if they do
			// not contribute to control.
			//
			// The following details how a cursor array works:
			//
			// Result after request resulting in 5 events. Each c obtained with
			// an e points to the ~next e.
			//
			//    +----+   +----+        +----+
			//    | e1 |   | c1 |        | e1 |
			//    +----+   +----+        +----+   +----+
			//    | e2 |   | c2 |        | e2 | < | c1 |
			//    +----+   +----+        +----+   +----+
			//    | e3 |   | c3 |        | e3 | < | c2 |
			//    +----+   +----+   =>   +----+   +----+
			//    | e4 |   | c4 |        | e4 | < | c3 |
			//    +----+   +----+        +----+   +----+
			//    | e5 |   | c5 |        | e5 | < | c4 |
			//    +----+   +----+        +----+   +----+
			//                           |next| < | c5 |
			//                           +----+   +----+
			//
			// After a successful publication this will leave a single c and
			// and empty events array. So the next evaluation has a boot.
			//
			// If the publication fails or execution is terminated at some
			// point during the events array, we may end up with, e.g.
			//
			//    +----+  +----+        +----+   +----+
			//    | e3 |  | c3 |        |next| < | c3 |
			//    +----+  +----+        +----+   +----+
			//    | e4 |  | c4 |   =>
			//    +----+  +----+          lost events
			//    | e5 |  | c5 |
			//    +----+  +----+
			//
			// At this point, the c3 cursor (or at worst the c2 cursor) has
			// been stored and we can continue from that point, recovering
			// the lost events and potentially re-requesting e3.

			var ok bool
			ok, waitUntil, err = handleResponse(log, state, limiter)
			if err != nil {
				return err
			}
			if !ok {
				continue
			}

			_, ok = state["url"]
			if !ok && goodURL != "" {
				state["url"] = goodURL
				log.Debugw("adding missing url from last valid value: state did not contain a url", "last_valid_url", goodURL)
			}

			e, ok := state["events"]
			if !ok {
				return errors.New("unexpected missing events array from evaluation")
			}
			var events []interface{}
			switch e := e.(type) {
			case []interface{}:
				if len(e) == 0 {
					return nil
				}
				events = e
			case map[string]interface{}:
				if e == nil {
					return nil
				}
				log.Errorw("single event object returned by evaluation", "event", e)
				if err, ok := e["error"]; ok {
					env.UpdateStatus(status.Degraded, fmt.Sprintf("single event error object returned by evaluation: %s", mapstr.M{"error": err}))
				} else {
					env.UpdateStatus(status.Degraded, "single event object returned by evaluation")
				}
				isDegraded = true
				events = []interface{}{e}
				// Make sure the cursor is not updated.
				delete(state, "cursor")
			default:
				return fmt.Errorf("unexpected type returned for evaluation events: %T", e)
			}

			// We have a non-empty batch of events to process.
			metrics.batchesReceived.Add(1)
			metrics.eventsReceived.Add(uint64(len(events)))

			// Drop events from state. If we fail during the publication,
			// we will re-request these events.
			delete(state, "events")

			// Get cursors if they exist.
			var (
				cursors      []interface{}
				singleCursor bool
			)
			if c, ok := state["cursor"]; ok {
				cursors, ok = c.([]interface{})
				if ok {
					if len(cursors) != len(events) {
						log.Errorw("unexpected cursor list length", "cursors", len(cursors), "events", len(events))
						env.UpdateStatus(status.Degraded, "unexpected cursor list length")
						isDegraded = true
						// But try to continue.
						if len(cursors) < len(events) {
							cursors = nil
						}
					}
				} else {
					cursors = []interface{}{c}
					singleCursor = true
				}
			}
			// Drop old cursor from state. This will be replaced with
			// the current cursor object below; it is an array now.
			delete(state, "cursor")

			start = time.Now()
			var hadPublicationError bool
			for i, e := range events {
				event, ok := e.(map[string]interface{})
				if !ok {
					return fmt.Errorf("unexpected type returned for evaluation events: %T", e)
				}
				var pubCursor interface{}
				if cursors != nil {
					if singleCursor {
						// Only set the cursor for publication at the last event
						// when a single cursor object has been provided.
						if i == len(events)-1 {
							goodCursor = cursor
							cursor, ok = cursors[0].(map[string]interface{})
							if !ok {
								return fmt.Errorf("unexpected type returned for evaluation cursor element: %T", cursors[0])
							}
							pubCursor = cursor
						}
					} else {
						goodCursor = cursor
						cursor, ok = cursors[i].(map[string]interface{})
						if !ok {
							return fmt.Errorf("unexpected type returned for evaluation cursor element: %T", cursors[i])
						}
						pubCursor = cursor
					}
				}
				err = pub.Publish(beat.Event{
					Timestamp: time.Now(),
					Fields:    event,
				}, pubCursor)
				if err != nil {
					hadPublicationError = true
					log.Errorw("error publishing event", "error", err)
					env.UpdateStatus(status.Degraded, "error publishing event: "+err.Error())
					isDegraded = true
					cursors = nil // We are lost, so retry with this event's cursor,
					continue      // but continue with the events that we have without
					// advancing the cursor. This allows us to potentially publish the
					// events we have now, with a fallback to the last guaranteed
					// correctly published cursor.
				}
				if i == 0 {
					metrics.batchesPublished.Add(1)
				}
				metrics.eventsPublished.Add(1)

				err = ctx.Err()
				if err != nil {
					return err
				}
			}

			if !isDegraded {
				env.UpdateStatus(status.Running, "")
			}

			metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())

			// Advance the cursor to the final state if there was no error during
			// publications. This is needed to transition to the next set of events.
			if !hadPublicationError {
				goodCursor = cursor
			}

			// Replace the last known good cursor.
			state["cursor"] = goodCursor

			if more, _ := state["want_more"].(bool); !more {
				return nil
			}

			// Check we have a remaining execution budget.
			budget--
			if budget <= 0 {
				log.Warnw("exceeding maximum number of CEL executions", "limit", *cfg.MaxExecutions)
				env.UpdateStatus(status.Degraded, "exceeding maximum number of CEL executions")
				return nil
			}
		}
	})
	switch {
	case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
		log.Infof("input stopped because context was cancelled with: %v", err)
		err = nil
	}
	return err
}