func()

in internal/pkg/api/handleAck.go [208:339]


func (ack *AckT) handleAckEvents(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, events []AckRequest_Events_Item) (AckResponse, error) {
	span, ctx := apm.StartSpan(ctx, "handleAckEvents", "process")
	defer span.End()
	var policyAcks []string

	var policyIdxs []int
	var unenrollIdxs []int

	res := NewAckResponse(len(events))

	// Error collects the largest error HTTP Status code from all acked events
	httpErr := HTTPError{http.StatusOK}

	setResult := func(pos, status int) {
		if status > httpErr.Status {
			httpErr.Status = status
		}
		res.SetResult(pos, status)
	}

	setError := func(pos int, err error) {
		var esErr *es.ErrElastic
		if errors.As(err, &esErr) {
			setResult(pos, esErr.Status)
		} else {
			setResult(pos, http.StatusInternalServerError)
		}
		res.SetError(pos, err)
		e := apm.CaptureError(ctx, err)
		e.Send()
	}

	for n, ev := range events {
		event, _ := ev.AsGenericEvent()
		span, ctx := apm.StartSpan(ctx, "ackEvent", "process")
		span.Context.SetLabel("agent_id", agent.Agent.ID)
		span.Context.SetLabel("action_id", event.ActionId)
		log := zlog.With().
			Str(logger.ActionID, event.ActionId).
			Str(logger.AgentID, event.AgentId).
			Time("timestamp", event.Timestamp).
			Int("n", n).Logger()
		log.Info().Msg("ack event")

		// Check agent id mismatch
		if event.AgentId != "" && event.AgentId != agent.Id {
			log.Error().Msg("agent id mismatch")
			setResult(n, http.StatusBadRequest)
			span.End()
			continue
		}

		// Check if this is the policy change ack
		// The policy change acks are handled after actions
		if strings.HasPrefix(event.ActionId, "policy:") {
			if event.Error == nil {
				// only added if no error on action
				policyAcks = append(policyAcks, event.ActionId)
				policyIdxs = append(policyIdxs, n)
			}
			// Set OK status, this can be overwritten in case of the errors later when the policy change events acked
			setResult(n, http.StatusOK)
			span.End()
			continue
		}

		// Process non-policy change actions
		// Find matching action by action ID
		vSpan, vCtx := apm.StartSpan(ctx, "ackAction", "validate")
		action, ok := ack.cache.GetAction(event.ActionId)
		if !ok {
			// Find action by ID
			actions, err := dl.FindAction(vCtx, ack.bulk, event.ActionId)
			if err != nil {
				log.Error().Err(err).Msg("find action")
				setError(n, err)
				vSpan.End()
				span.End()
				continue
			}

			// Set 404 if action is not found. The agent can retry it later.
			if len(actions) == 0 {
				log.Error().Msg("no matching action")
				setResult(n, http.StatusNotFound)
				vSpan.End()
				span.End()
				continue
			}
			action = actions[0]
			ack.cache.SetAction(action)
		}
		vSpan.End()

		if err := ack.handleActionResult(ctx, zlog, agent, action, ev); err != nil {
			setError(n, err)
		} else {
			setResult(n, http.StatusOK)
		}

		if event.Error == nil && action.Type == TypeUnenroll {
			unenrollIdxs = append(unenrollIdxs, n)
		}
		span.End()
	}

	// Process policy acks
	if len(policyAcks) > 0 {
		if err := ack.handlePolicyChange(ctx, zlog, agent, policyAcks...); err != nil {
			for _, idx := range policyIdxs {
				setError(idx, err)
			}
		}
	}

	// Process unenroll acks
	if len(unenrollIdxs) > 0 {
		if err := ack.handleUnenroll(ctx, zlog, agent); err != nil {
			zlog.Error().Err(err).Msg("handle unenroll event")
			// Set errors for each unenroll event
			for _, idx := range unenrollIdxs {
				setError(idx, err)
			}
		}
	}

	// Return both the data and error code
	if httpErr.Status > http.StatusOK {
		return res, &httpErr
	}
	return res, nil
}