ingestor/adx/tasks.go (360 lines of code) (raw):

package adx import ( "context" ERRS "errors" "fmt" "io" "strings" "sync" "time" v1 "github.com/Azure/adx-mon/api/v1" "github.com/Azure/adx-mon/ingestor/storage" "github.com/Azure/adx-mon/pkg/logger" "github.com/Azure/azure-kusto-go/kusto" "github.com/Azure/azure-kusto-go/kusto/data/errors" "github.com/Azure/azure-kusto-go/kusto/kql" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type TableDetail struct { TableName string `kusto:"TableName"` HotExtentSize float64 `kusto:"HotExtentSize"` TotalExtentSize float64 `kusto:"TotalExtentSize"` TotalExtents int64 `kusto:"TotalExtents"` HotRowCount int64 `kusto:"HotRowCount"` TotalRowCount int64 `kusto:"TotalRowCount"` } type DropUnusedTablesTask struct { mu sync.Mutex unusedTables map[string]int kustoCli StatementExecutor database string } type StatementExecutor interface { Database() string Endpoint() string Mgmt(ctx context.Context, query kusto.Statement, options ...kusto.MgmtOption) (*kusto.RowIterator, error) } func NewDropUnusedTablesTask(kustoCli StatementExecutor) *DropUnusedTablesTask { return &DropUnusedTablesTask{ unusedTables: make(map[string]int), kustoCli: kustoCli, } } func (t *DropUnusedTablesTask) Run(ctx context.Context) error { t.mu.Lock() defer t.mu.Unlock() details, err := t.loadTableDetails(ctx) if err != nil { return fmt.Errorf("error loading table details: %w", err) } for _, v := range details { if v.TotalRowCount > 0 { delete(t.unusedTables, v.TableName) } if v.TotalRowCount == 0 { t.unusedTables[v.TableName]++ logger.Infof("Marking table %s.%s as unused", t.database, v.TableName) } } for table, count := range t.unusedTables { if count > 2 { logger.Infof("DRYRUN Dropping unused table %s.%s", t.kustoCli.Database(), table) // stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fmt.Sprintf(".drop table %s", table)) // if _, err := t.kustoCli.Mgmt(ctx, stmt); err != nil { // return fmt.Errorf("error dropping table %s: %w", table, err) // } delete(t.unusedTables, table) } } return nil } func (t *DropUnusedTablesTask) loadTableDetails(ctx context.Context) ([]TableDetail, error) { stmt := kusto.NewStmt(".show tables details | project TableName, HotExtentSize, TotalExtentSize, TotalExtents, HotRowCount, TotalRowCount") rows, err := t.kustoCli.Mgmt(ctx, stmt) if err != nil { return nil, err } var tables []TableDetail for { row, err1, err2 := rows.NextRowOrError() if err2 == io.EOF { return tables, nil } else if err1 != nil { return tables, err1 } else if err2 != nil { return tables, err2 } var v TableDetail if err := row.ToStruct(&v); err != nil { return tables, err } tables = append(tables, v) } } type SyncFunctionsTask struct { store storage.Functions kustoCli StatementExecutor } func NewSyncFunctionsTask(store storage.Functions, kustoCli StatementExecutor) *SyncFunctionsTask { return &SyncFunctionsTask{ store: store, kustoCli: kustoCli, } } func (t *SyncFunctionsTask) Run(ctx context.Context) error { functions, err := t.store.List(ctx) if err != nil { return fmt.Errorf("failed to list functions: %w", err) } for _, function := range functions { if function.Spec.Database != v1.AllDatabases && function.Spec.Database != t.kustoCli.Database() { continue } if !function.DeletionTimestamp.IsZero() { // Until we can parse KQL we don't actually know the function's // name as described in this CRD; however, we'll make the assumption // that the CRD name is the same as the function name in Kusto and // attempt a delete. stmt := kql.New(".drop function ").AddUnsafe(function.Name).AddLiteral(" ifexists") if _, err := t.kustoCli.Mgmt(ctx, stmt); err != nil { logger.Errorf("Failed to delete function %s.%s: %v", function.Spec.Database, function.Name, err) // Deletion is best-effort, especially while we still can't parse KQL } t.updateKQLFunctionStatus(ctx, function, v1.Success, nil) return nil } // If endpoints have changed, or function is not in Success, re-apply if t.kustoCli.Endpoint() != function.Spec.AppliedEndpoint || function.Status.Status != v1.Success || function.GetGeneration() != function.Status.ObservedGeneration { stmt := kql.New(".execute database script with (ThrowOnErrors=true) <| ").AddUnsafe(function.Spec.Body) if _, err := t.kustoCli.Mgmt(ctx, stmt); err != nil { if !errors.Retry(err) { logger.Errorf("Permanent failure to create function %s.%s: %v", function.Spec.Database, function.Name, err) if err = t.updateKQLFunctionStatus(ctx, function, v1.PermanentFailure, err); err != nil { logger.Errorf("Failed to update permanent failure status: %v", err) } continue } else { t.updateKQLFunctionStatus(ctx, function, v1.Failed, err) logger.Warnf("Transient failure to create function %s.%s: %v", function.Spec.Database, function.Name, err) continue } } logger.Infof("Successfully created function %s.%s", function.Spec.Database, function.Name) if t.kustoCli.Endpoint() != function.Spec.AppliedEndpoint { function.Spec.AppliedEndpoint = t.kustoCli.Endpoint() if err := t.store.Update(ctx, function); err != nil { logger.Errorf("Failed to update function %s.%s: %v", function.Spec.Database, function.Name, err) } } if err := t.updateKQLFunctionStatus(ctx, function, v1.Success, nil); err != nil { logger.Errorf("Failed to update success status: %v", err) } } } return nil } func (t *SyncFunctionsTask) updateKQLFunctionStatus(ctx context.Context, fn *v1.Function, status v1.FunctionStatusEnum, err error) error { fn.Status.Status = status if err != nil { errMsg := err.Error() var kustoerr *errors.HttpError if ERRS.As(err, &kustoerr) { decoded := kustoerr.UnmarshalREST() if errMap, ok := decoded["error"].(map[string]interface{}); ok { if errMsgVal, ok := errMap["@message"].(string); ok { errMsg = errMsgVal } } } if len(errMsg) > 256 { errMsg = errMsg[:256] } fn.Status.Error = errMsg } if err := t.store.UpdateStatus(ctx, fn); err != nil { return fmt.Errorf("failed to update status for function %s.%s: %w", fn.Spec.Database, fn.Name, err) } return nil } type ManagementCommandTask struct { store storage.CRDHandler kustoCli StatementExecutor } func NewManagementCommandsTask(store storage.CRDHandler, kustoCli StatementExecutor) *ManagementCommandTask { return &ManagementCommandTask{ store: store, kustoCli: kustoCli, } } func (t *ManagementCommandTask) Run(ctx context.Context) error { managementCommands := &v1.ManagementCommandList{} if err := t.store.List(ctx, managementCommands, storage.FilterCompleted); err != nil { return fmt.Errorf("failed to list management commands: %w", err) } for _, command := range managementCommands.Items { // ManagementCommands database is optional as not all commands are scoped at the database level if command.Spec.Database != "" && command.Spec.Database != t.kustoCli.Database() { continue } var stmt *kql.Builder if command.Spec.Database == "" { stmt = kql.New(".execute cluster script with (ThrowOnErrors = true) <|").AddUnsafe(command.Spec.Body) } else { stmt = kql.New(".execute database script with (ThrowOnErrors = true) <|").AddUnsafe(command.Spec.Body) } if _, err := t.kustoCli.Mgmt(ctx, stmt); err != nil { logger.Errorf("Failed to execute management command %s.%s: %v", command.Spec.Database, command.Name, err) if err = t.store.UpdateStatus(ctx, &command, err); err != nil { logger.Errorf("Failed to update management command status: %v", err) } } logger.Infof("Successfully executed management command %s.%s", command.Spec.Database, command.Name) if err := t.store.UpdateStatus(ctx, &command, nil); err != nil { logger.Errorf("Failed to update success status: %v", err) } } return nil } type SummaryRuleTask struct { store storage.CRDHandler kustoCli StatementExecutor } func NewSummaryRuleTask(store storage.CRDHandler, kustoCli StatementExecutor) *SummaryRuleTask { return &SummaryRuleTask{ store: store, kustoCli: kustoCli, } } type KustoAsyncOperationState string const ( // KustoAsyncOperationStateCompleted indicates that the async operation has completed successfully KustoAsyncOperationStateCompleted KustoAsyncOperationState = "Completed" // KustoAsyncOperationStateFailed indicates that the async operation has failed KustoAsyncOperationStateFailed KustoAsyncOperationState = "Failed" // KustoAsyncOperationStateInProgress indicates that the async operation is in progress KustoAsyncOperationStateInProgress KustoAsyncOperationState = "InProgress" // KustoAsyncOperationStateThrottled indicates that the async operation is being throttled KustoAsyncOperationStateThrottled KustoAsyncOperationState = "Throttled" // KustoAsyncOperationStateScheduled indicates that the async operation is scheduled KustoAsyncOperationStateScheduled KustoAsyncOperationState = "Scheduled" ) // IsKustoAsyncOperationStateCompleted returns true if the async operation is completed // (either succeeded or failed) // This is used to determine if we should poll the async operation's status. func IsKustoAsyncOperationStateCompleted(state string) bool { return state == string(KustoAsyncOperationStateCompleted) || state == string(KustoAsyncOperationStateFailed) } // Run executes the SummaryRuleTask which manages summary rules and their associated // Kusto async operations. It handles rule submission, operation tracking, and status updates. func (t *SummaryRuleTask) Run(ctx context.Context) error { // Fetch all summary rules from storage summaryRules := &v1.SummaryRuleList{} if err := t.store.List(ctx, summaryRules); err != nil { return fmt.Errorf("failed to list summary rules: %w", err) } // Get the status of all async operations currently tracked in Kusto // to match against our rules' operations kustoAsyncOperations, err := t.GetOperations(ctx) if err != nil { return fmt.Errorf("failed to get async operations: %w", err) } // Process each summary rule individually for _, rule := range summaryRules.Items { // Skip rules not belonging to the current database if rule.Spec.Database != t.kustoCli.Database() { continue } // Get the current condition of the rule cnd := rule.GetCondition() if cnd == nil { // For first-time execution, initialize the condition with a timestamp // that's one interval back from current time cnd = &metav1.Condition{ LastTransitionTime: metav1.Time{Time: time.Now().Add(-rule.Spec.Interval.Duration)}, } } // Determine if the rule should be executed based on several criteria: // 1. The rule is being deleted // 2. Previous submission failed // 3. Rule has been updated (new generation) // 4. It's time for the next interval execution shouldSubmitRule := rule.DeletionTimestamp != nil || // Rule is being deleted cnd.Status == metav1.ConditionFalse || // Submission failed, so no async operation was created for this interval cnd.ObservedGeneration != rule.GetGeneration() || // A new version of this CRD was created time.Since(cnd.LastTransitionTime.Time) >= rule.Spec.Interval.Duration // It's time to execute for a new interval if shouldSubmitRule { // Prepare a new async operation with time range from last execution to now asyncOp := v1.AsyncOperation{ StartTime: cnd.LastTransitionTime.UTC().Truncate(time.Minute).Format(time.RFC3339Nano), EndTime: time.Now().UTC().Truncate(time.Minute).Format(time.RFC3339Nano), } operationId, err := t.SubmitRule(ctx, rule, asyncOp.StartTime, asyncOp.EndTime) if err != nil { t.store.UpdateStatus(ctx, &rule, err) } else { asyncOp.OperationId = operationId rule.SetAsyncOperation(asyncOp) } } // Process any outstanding async operations for this rule operations := rule.GetAsyncOperations() foundOperations := make(map[string]bool) for _, op := range operations { found := false for _, kustoOp := range kustoAsyncOperations { if op.OperationId == kustoOp.OperationId { found = true foundOperations[op.OperationId] = true if IsKustoAsyncOperationStateCompleted(kustoOp.State) { // We're done polling this async operation, so we can remove it from the list rule.RemoveAsyncOperation(kustoOp.OperationId) } if kustoOp.ShouldRetry != 0 { if _, err := t.SubmitRule(ctx, rule, op.StartTime, op.EndTime); err != nil { logger.Errorf("Failed to submit rule: %v", err) } } // Operation is still in progress, so we can skip it } // If the operation wasn't found in the Kusto operations list, // it has fallen out of the backlog window if !found { logger.Infof("Async operation %s for rule %s has fallen out of the Kusto backlog window, removing", op.OperationId, rule.Name) rule.RemoveAsyncOperation(op.OperationId) } } } if err := t.store.UpdateStatus(ctx, &rule, nil); err != nil { logger.Errorf("Failed to update summary rule status: %v", err) // Not a lot we can do here, we'll end up just retrying next interval. } } return nil } func (t *SummaryRuleTask) SubmitRule(ctx context.Context, rule v1.SummaryRule, startTime, endTime string) (string, error) { // NOTE: We cannot do something like `let _startTime = datetime();` as dot-command do not permit // preceding let-statements. rule.Spec.Body = strings.ReplaceAll(rule.Spec.Body, "_startTime", fmt.Sprintf("datetime(%s)", startTime)) rule.Spec.Body = strings.ReplaceAll(rule.Spec.Body, "_endTime", fmt.Sprintf("datetime(%s)", endTime)) // Execute asynchronously stmt := kql.New(".set-or-append async ").AddUnsafe(rule.Spec.Table).AddLiteral(" <| ").AddUnsafe(rule.Spec.Body) res, err := t.kustoCli.Mgmt(ctx, stmt) if err != nil { return "", fmt.Errorf("failed to execute summary rule %s.%s: %w", rule.Spec.Database, rule.Name, err) } return operationIDFromResult(res) } func (t *SummaryRuleTask) GetOperations(ctx context.Context) ([]AsyncOperationStatus, error) { // List all the async operations that have been executed in the last 24 hours. If one of our // async operations falls out of this window, it's time to stop trying that particular operation. stmt := kql.New(".show operations | where StartedOn > ago(1d) | where Operation == 'TableSetOrAppend' | summarize arg_max(LastUpdatedOn, OperationId, State, ShouldRetry) by OperationId | project LastUpdatedOn, OperationId = tostring(OperationId), State, ShouldRetry = todouble(ShouldRetry) | sort by LastUpdatedOn asc") rows, err := t.kustoCli.Mgmt(ctx, stmt) if err != nil { return nil, fmt.Errorf("failed to retrieve async operations: %w", err) } defer rows.Stop() var operations []AsyncOperationStatus for { row, errInline, errFinal := rows.NextRowOrError() if errFinal == io.EOF { break } if errInline != nil { continue } if errFinal != nil { return nil, fmt.Errorf("failed to retrieve async operations: %v", errFinal) } var status AsyncOperationStatus if err := row.ToStruct(&status); err != nil { return nil, fmt.Errorf("failed to parse async operation: %v", err) } if status.State == "" { continue } operations = append(operations, status) } return operations, nil } func operationIDFromResult(iter *kusto.RowIterator) (string, error) { defer iter.Stop() for { row, errInline, errFinal := iter.NextRowOrError() if errFinal == io.EOF { break } if errInline != nil { continue } if errFinal != nil { return "", fmt.Errorf("failed to retrieve operation ID: %v", errFinal) } if len(row.Values) != 1 { return "", fmt.Errorf("unexpected number of values in row: %d", len(row.Values)) } return row.Values[0].String(), nil } return "", nil } type AsyncOperationStatus struct { OperationId string `kusto:"OperationId"` LastUpdatedOn time.Time `kusto:"LastUpdatedOn"` State string `kusto:"State"` ShouldRetry float64 `kusto:"ShouldRetry"` }