in internal/pkg/api/handleCheckin.go [260:400]
func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r *http.Request, start time.Time, agent *model.Agent, ver string) error {
zlog = zlog.With().
Str(logger.AgentID, agent.Id).Logger()
validated, err := ct.validateRequest(zlog, w, r, start, agent)
if err != nil {
return err
}
req := validated.req
pollDuration := validated.dur
rawMeta := validated.rawMeta
rawComponents := validated.rawComp
seqno := validated.seqno
unhealthyReason := validated.unhealthyReason
// Handle upgrade details for agents using the new 8.11 upgrade details field of the checkin.
// Older agents will communicate any issues with upgrades via the Ack endpoint.
if err := ct.processUpgradeDetails(r.Context(), agent, req.UpgradeDetails); err != nil {
return fmt.Errorf("failed to update upgrade_details: %w", err)
}
// Subscribe to actions dispatcher
aSub := ct.ad.Subscribe(zlog, agent.Id, seqno)
defer ct.ad.Unsubscribe(zlog, aSub)
actCh := aSub.Ch()
// use revision_idx=0 if the agent has a single output where no API key is defined
// This will force the policy monitor to emit a new policy to regerate API keys
revID := agent.PolicyRevisionIdx
for _, output := range agent.Outputs {
if output.APIKey == "" {
revID = 0
break
}
}
// Subscribe to policy manager for changes on PolicyId > policyRev
sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, revID)
if err != nil {
return fmt.Errorf("subscribe policy monitor: %w", err)
}
defer func() {
err := ct.pm.Unsubscribe(sub)
if err != nil {
zlog.Error().Err(err).Str(logger.PolicyID, agent.PolicyID).Msg("unable to unsubscribe from policy")
}
}()
// Update check-in timestamp on timeout
tick := time.NewTicker(ct.cfg.Timeouts.CheckinTimestamp)
defer tick.Stop()
setupDuration := time.Since(start)
pollDuration, jitter := calcPollDuration(zlog, pollDuration, setupDuration, ct.cfg.Timeouts.CheckinJitter)
zlog.Debug().
Str("status", string(req.Status)).
Str("seqNo", seqno.String()).
Dur("setupDuration", setupDuration).
Dur("jitter", jitter).
Dur("pollDuration", pollDuration).
Msg("checkin start long poll")
// Chill out for a bit. Long poll.
longPoll := time.NewTicker(pollDuration)
defer longPoll.Stop()
// Initial update on checkin, and any user fields that might have changed
// Run a script to remove audit_unenrolled_* and unenrolled_at attributes if one is set on checkin.
// 8.16.x releases would incorrectly set unenrolled_at
err = ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, rawMeta, rawComponents, seqno, ver, unhealthyReason, agent.AuditUnenrolledReason != "" || agent.UnenrolledAt != "")
if err != nil {
zlog.Error().Err(err).Str(logger.AgentID, agent.Id).Msg("checkin failed")
}
// Initial fetch for pending actions
var (
actions []Action
ackToken string
)
// Check agent pending actions first
pendingActions, err := ct.fetchAgentPendingActions(r.Context(), seqno, agent.Id)
if err != nil {
return err
}
pendingActions = filterActions(zlog, agent.Id, pendingActions)
actions, ackToken = convertActions(zlog, agent.Id, pendingActions)
span, ctx := apm.StartSpan(r.Context(), "longPoll", "process")
if len(actions) == 0 {
LOOP:
for {
select {
case <-ctx.Done():
defer span.End()
// If the request context is canceled, the API server is shutting down.
// We want to immediately stop the long-poll and return a 200 with the ackToken and no actions.
if errors.Is(ctx.Err(), context.Canceled) {
resp := CheckinResponse{
AckToken: &ackToken,
Action: "checkin",
}
return ct.writeResponse(zlog, w, r, agent, resp)
}
return ctx.Err()
case acdocs := <-actCh:
var acs []Action
acdocs = filterActions(zlog, agent.Id, acdocs)
acs, ackToken = convertActions(zlog, agent.Id, acdocs)
actions = append(actions, acs...)
break LOOP
case policy := <-sub.Output():
actionResp, err := processPolicy(ctx, zlog, ct.bulker, agent.Id, policy)
if err != nil {
span.End()
return fmt.Errorf("processPolicy: %w", err)
}
actions = append(actions, *actionResp)
break LOOP
case <-longPoll.C:
zlog.Trace().Msg("fire long poll")
break LOOP
case <-tick.C:
err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver, unhealthyReason, false)
if err != nil {
zlog.Error().Err(err).Str(logger.AgentID, agent.Id).Msg("checkin failed")
}
}
}
}
span.End()
resp := CheckinResponse{
AckToken: &ackToken,
Action: "checkin",
Actions: &actions,
}
return ct.writeResponse(zlog, w, r, agent, resp)
}