func()

in internal/pkg/api/handleCheckin.go [260:400]


func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r *http.Request, start time.Time, agent *model.Agent, ver string) error {
	zlog = zlog.With().
		Str(logger.AgentID, agent.Id).Logger()
	validated, err := ct.validateRequest(zlog, w, r, start, agent)
	if err != nil {
		return err
	}
	req := validated.req
	pollDuration := validated.dur
	rawMeta := validated.rawMeta
	rawComponents := validated.rawComp
	seqno := validated.seqno
	unhealthyReason := validated.unhealthyReason

	// Handle upgrade details for agents using the new 8.11 upgrade details field of the checkin.
	// Older agents will communicate any issues with upgrades via the Ack endpoint.
	if err := ct.processUpgradeDetails(r.Context(), agent, req.UpgradeDetails); err != nil {
		return fmt.Errorf("failed to update upgrade_details: %w", err)
	}

	// Subscribe to actions dispatcher
	aSub := ct.ad.Subscribe(zlog, agent.Id, seqno)
	defer ct.ad.Unsubscribe(zlog, aSub)
	actCh := aSub.Ch()

	// use revision_idx=0 if the agent has a single output where no API key is defined
	// This will force the policy monitor to emit a new policy to regerate API keys
	revID := agent.PolicyRevisionIdx
	for _, output := range agent.Outputs {
		if output.APIKey == "" {
			revID = 0
			break
		}
	}

	// Subscribe to policy manager for changes on PolicyId > policyRev
	sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, revID)
	if err != nil {
		return fmt.Errorf("subscribe policy monitor: %w", err)
	}
	defer func() {
		err := ct.pm.Unsubscribe(sub)
		if err != nil {
			zlog.Error().Err(err).Str(logger.PolicyID, agent.PolicyID).Msg("unable to unsubscribe from policy")
		}
	}()

	// Update check-in timestamp on timeout
	tick := time.NewTicker(ct.cfg.Timeouts.CheckinTimestamp)
	defer tick.Stop()

	setupDuration := time.Since(start)
	pollDuration, jitter := calcPollDuration(zlog, pollDuration, setupDuration, ct.cfg.Timeouts.CheckinJitter)

	zlog.Debug().
		Str("status", string(req.Status)).
		Str("seqNo", seqno.String()).
		Dur("setupDuration", setupDuration).
		Dur("jitter", jitter).
		Dur("pollDuration", pollDuration).
		Msg("checkin start long poll")

	// Chill out for a bit. Long poll.
	longPoll := time.NewTicker(pollDuration)
	defer longPoll.Stop()

	// Initial update on checkin, and any user fields that might have changed
	// Run a script to remove audit_unenrolled_* and unenrolled_at attributes if one is set on checkin.
	// 8.16.x releases would incorrectly set unenrolled_at
	err = ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, rawMeta, rawComponents, seqno, ver, unhealthyReason, agent.AuditUnenrolledReason != "" || agent.UnenrolledAt != "")
	if err != nil {
		zlog.Error().Err(err).Str(logger.AgentID, agent.Id).Msg("checkin failed")
	}

	// Initial fetch for pending actions
	var (
		actions  []Action
		ackToken string
	)

	// Check agent pending actions first
	pendingActions, err := ct.fetchAgentPendingActions(r.Context(), seqno, agent.Id)
	if err != nil {
		return err
	}
	pendingActions = filterActions(zlog, agent.Id, pendingActions)
	actions, ackToken = convertActions(zlog, agent.Id, pendingActions)

	span, ctx := apm.StartSpan(r.Context(), "longPoll", "process")

	if len(actions) == 0 {
	LOOP:
		for {
			select {
			case <-ctx.Done():
				defer span.End()
				// If the request context is canceled, the API server is shutting down.
				// We want to immediately stop the long-poll and return a 200 with the ackToken and no actions.
				if errors.Is(ctx.Err(), context.Canceled) {
					resp := CheckinResponse{
						AckToken: &ackToken,
						Action:   "checkin",
					}
					return ct.writeResponse(zlog, w, r, agent, resp)
				}
				return ctx.Err()
			case acdocs := <-actCh:
				var acs []Action
				acdocs = filterActions(zlog, agent.Id, acdocs)
				acs, ackToken = convertActions(zlog, agent.Id, acdocs)
				actions = append(actions, acs...)
				break LOOP
			case policy := <-sub.Output():
				actionResp, err := processPolicy(ctx, zlog, ct.bulker, agent.Id, policy)
				if err != nil {
					span.End()
					return fmt.Errorf("processPolicy: %w", err)
				}
				actions = append(actions, *actionResp)
				break LOOP
			case <-longPoll.C:
				zlog.Trace().Msg("fire long poll")
				break LOOP
			case <-tick.C:
				err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver, unhealthyReason, false)
				if err != nil {
					zlog.Error().Err(err).Str(logger.AgentID, agent.Id).Msg("checkin failed")
				}
			}
		}
	}
	span.End()

	resp := CheckinResponse{
		AckToken: &ackToken,
		Action:   "checkin",
		Actions:  &actions,
	}

	return ct.writeResponse(zlog, w, r, agent, resp)
}