in ingestor/adx/tasks.go [288:385]
func (t *SummaryRuleTask) Run(ctx context.Context) error {
// Fetch all summary rules from storage
summaryRules := &v1.SummaryRuleList{}
if err := t.store.List(ctx, summaryRules); err != nil {
return fmt.Errorf("failed to list summary rules: %w", err)
}
// Get the status of all async operations currently tracked in Kusto
// to match against our rules' operations
kustoAsyncOperations, err := t.GetOperations(ctx)
if err != nil {
return fmt.Errorf("failed to get async operations: %w", err)
}
// Process each summary rule individually
for _, rule := range summaryRules.Items {
// Skip rules not belonging to the current database
if rule.Spec.Database != t.kustoCli.Database() {
continue
}
// Get the current condition of the rule
cnd := rule.GetCondition()
if cnd == nil {
// For first-time execution, initialize the condition with a timestamp
// that's one interval back from current time
cnd = &metav1.Condition{
LastTransitionTime: metav1.Time{Time: time.Now().Add(-rule.Spec.Interval.Duration)},
}
}
// Determine if the rule should be executed based on several criteria:
// 1. The rule is being deleted
// 2. Previous submission failed
// 3. Rule has been updated (new generation)
// 4. It's time for the next interval execution
shouldSubmitRule := rule.DeletionTimestamp != nil || // Rule is being deleted
cnd.Status == metav1.ConditionFalse || // Submission failed, so no async operation was created for this interval
cnd.ObservedGeneration != rule.GetGeneration() || // A new version of this CRD was created
time.Since(cnd.LastTransitionTime.Time) >= rule.Spec.Interval.Duration // It's time to execute for a new interval
if shouldSubmitRule {
// Prepare a new async operation with time range from last execution to now
asyncOp := v1.AsyncOperation{
StartTime: cnd.LastTransitionTime.UTC().Truncate(time.Minute).Format(time.RFC3339Nano),
EndTime: time.Now().UTC().Truncate(time.Minute).Format(time.RFC3339Nano),
}
operationId, err := t.SubmitRule(ctx, rule, asyncOp.StartTime, asyncOp.EndTime)
if err != nil {
t.store.UpdateStatus(ctx, &rule, err)
} else {
asyncOp.OperationId = operationId
rule.SetAsyncOperation(asyncOp)
}
}
// Process any outstanding async operations for this rule
operations := rule.GetAsyncOperations()
foundOperations := make(map[string]bool)
for _, op := range operations {
found := false
for _, kustoOp := range kustoAsyncOperations {
if op.OperationId == kustoOp.OperationId {
found = true
foundOperations[op.OperationId] = true
if IsKustoAsyncOperationStateCompleted(kustoOp.State) {
// We're done polling this async operation, so we can remove it from the list
rule.RemoveAsyncOperation(kustoOp.OperationId)
}
if kustoOp.ShouldRetry != 0 {
if _, err := t.SubmitRule(ctx, rule, op.StartTime, op.EndTime); err != nil {
logger.Errorf("Failed to submit rule: %v", err)
}
}
// Operation is still in progress, so we can skip it
}
// If the operation wasn't found in the Kusto operations list,
// it has fallen out of the backlog window
if !found {
logger.Infof("Async operation %s for rule %s has fallen out of the Kusto backlog window, removing",
op.OperationId, rule.Name)
rule.RemoveAsyncOperation(op.OperationId)
}
}
}
if err := t.store.UpdateStatus(ctx, &rule, nil); err != nil {
logger.Errorf("Failed to update summary rule status: %v", err)
// Not a lot we can do here, we'll end up just retrying next interval.
}
}
return nil
}