func()

in ingestor/adx/tasks.go [288:385]


func (t *SummaryRuleTask) Run(ctx context.Context) error {
	// Fetch all summary rules from storage
	summaryRules := &v1.SummaryRuleList{}
	if err := t.store.List(ctx, summaryRules); err != nil {
		return fmt.Errorf("failed to list summary rules: %w", err)
	}

	// Get the status of all async operations currently tracked in Kusto
	// to match against our rules' operations
	kustoAsyncOperations, err := t.GetOperations(ctx)
	if err != nil {
		return fmt.Errorf("failed to get async operations: %w", err)
	}

	// Process each summary rule individually
	for _, rule := range summaryRules.Items {
		// Skip rules not belonging to the current database
		if rule.Spec.Database != t.kustoCli.Database() {
			continue
		}

		// Get the current condition of the rule
		cnd := rule.GetCondition()
		if cnd == nil {
			// For first-time execution, initialize the condition with a timestamp
			// that's one interval back from current time
			cnd = &metav1.Condition{
				LastTransitionTime: metav1.Time{Time: time.Now().Add(-rule.Spec.Interval.Duration)},
			}
		}

		// Determine if the rule should be executed based on several criteria:
		// 1. The rule is being deleted
		// 2. Previous submission failed
		// 3. Rule has been updated (new generation)
		// 4. It's time for the next interval execution
		shouldSubmitRule := rule.DeletionTimestamp != nil || // Rule is being deleted
			cnd.Status == metav1.ConditionFalse || // Submission failed, so no async operation was created for this interval
			cnd.ObservedGeneration != rule.GetGeneration() || // A new version of this CRD was created
			time.Since(cnd.LastTransitionTime.Time) >= rule.Spec.Interval.Duration // It's time to execute for a new interval

		if shouldSubmitRule {
			// Prepare a new async operation with time range from last execution to now
			asyncOp := v1.AsyncOperation{
				StartTime: cnd.LastTransitionTime.UTC().Truncate(time.Minute).Format(time.RFC3339Nano),
				EndTime:   time.Now().UTC().Truncate(time.Minute).Format(time.RFC3339Nano),
			}
			operationId, err := t.SubmitRule(ctx, rule, asyncOp.StartTime, asyncOp.EndTime)
			if err != nil {
				t.store.UpdateStatus(ctx, &rule, err)
			} else {

				asyncOp.OperationId = operationId
				rule.SetAsyncOperation(asyncOp)
			}
		}

		// Process any outstanding async operations for this rule
		operations := rule.GetAsyncOperations()
		foundOperations := make(map[string]bool)

		for _, op := range operations {
			found := false
			for _, kustoOp := range kustoAsyncOperations {
				if op.OperationId == kustoOp.OperationId {
					found = true
					foundOperations[op.OperationId] = true

					if IsKustoAsyncOperationStateCompleted(kustoOp.State) {
						// We're done polling this async operation, so we can remove it from the list
						rule.RemoveAsyncOperation(kustoOp.OperationId)
					}
					if kustoOp.ShouldRetry != 0 {
						if _, err := t.SubmitRule(ctx, rule, op.StartTime, op.EndTime); err != nil {
							logger.Errorf("Failed to submit rule: %v", err)
						}
					}

					// Operation is still in progress, so we can skip it
				}
				// If the operation wasn't found in the Kusto operations list,
				// it has fallen out of the backlog window
				if !found {
					logger.Infof("Async operation %s for rule %s has fallen out of the Kusto backlog window, removing",
						op.OperationId, rule.Name)
					rule.RemoveAsyncOperation(op.OperationId)
				}
			}
		}

		if err := t.store.UpdateStatus(ctx, &rule, nil); err != nil {
			logger.Errorf("Failed to update summary rule status: %v", err)
			// Not a lot we can do here, we'll end up just retrying next interval.
		}
	}

	return nil
}