in internal/pkg/api/handleAck.go [208:339]
func (ack *AckT) handleAckEvents(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, events []AckRequest_Events_Item) (AckResponse, error) {
span, ctx := apm.StartSpan(ctx, "handleAckEvents", "process")
defer span.End()
var policyAcks []string
var policyIdxs []int
var unenrollIdxs []int
res := NewAckResponse(len(events))
// Error collects the largest error HTTP Status code from all acked events
httpErr := HTTPError{http.StatusOK}
setResult := func(pos, status int) {
if status > httpErr.Status {
httpErr.Status = status
}
res.SetResult(pos, status)
}
setError := func(pos int, err error) {
var esErr *es.ErrElastic
if errors.As(err, &esErr) {
setResult(pos, esErr.Status)
} else {
setResult(pos, http.StatusInternalServerError)
}
res.SetError(pos, err)
e := apm.CaptureError(ctx, err)
e.Send()
}
for n, ev := range events {
event, _ := ev.AsGenericEvent()
span, ctx := apm.StartSpan(ctx, "ackEvent", "process")
span.Context.SetLabel("agent_id", agent.Agent.ID)
span.Context.SetLabel("action_id", event.ActionId)
log := zlog.With().
Str(logger.ActionID, event.ActionId).
Str(logger.AgentID, event.AgentId).
Time("timestamp", event.Timestamp).
Int("n", n).Logger()
log.Info().Msg("ack event")
// Check agent id mismatch
if event.AgentId != "" && event.AgentId != agent.Id {
log.Error().Msg("agent id mismatch")
setResult(n, http.StatusBadRequest)
span.End()
continue
}
// Check if this is the policy change ack
// The policy change acks are handled after actions
if strings.HasPrefix(event.ActionId, "policy:") {
if event.Error == nil {
// only added if no error on action
policyAcks = append(policyAcks, event.ActionId)
policyIdxs = append(policyIdxs, n)
}
// Set OK status, this can be overwritten in case of the errors later when the policy change events acked
setResult(n, http.StatusOK)
span.End()
continue
}
// Process non-policy change actions
// Find matching action by action ID
vSpan, vCtx := apm.StartSpan(ctx, "ackAction", "validate")
action, ok := ack.cache.GetAction(event.ActionId)
if !ok {
// Find action by ID
actions, err := dl.FindAction(vCtx, ack.bulk, event.ActionId)
if err != nil {
log.Error().Err(err).Msg("find action")
setError(n, err)
vSpan.End()
span.End()
continue
}
// Set 404 if action is not found. The agent can retry it later.
if len(actions) == 0 {
log.Error().Msg("no matching action")
setResult(n, http.StatusNotFound)
vSpan.End()
span.End()
continue
}
action = actions[0]
ack.cache.SetAction(action)
}
vSpan.End()
if err := ack.handleActionResult(ctx, zlog, agent, action, ev); err != nil {
setError(n, err)
} else {
setResult(n, http.StatusOK)
}
if event.Error == nil && action.Type == TypeUnenroll {
unenrollIdxs = append(unenrollIdxs, n)
}
span.End()
}
// Process policy acks
if len(policyAcks) > 0 {
if err := ack.handlePolicyChange(ctx, zlog, agent, policyAcks...); err != nil {
for _, idx := range policyIdxs {
setError(idx, err)
}
}
}
// Process unenroll acks
if len(unenrollIdxs) > 0 {
if err := ack.handleUnenroll(ctx, zlog, agent); err != nil {
zlog.Error().Err(err).Msg("handle unenroll event")
// Set errors for each unenroll event
for _, idx := range unenrollIdxs {
setError(idx, err)
}
}
}
// Return both the data and error code
if httpErr.Status > http.StatusOK {
return res, &httpErr
}
return res, nil
}