in ingestor/adx/tasks.go [121:178]
func (t *SyncFunctionsTask) Run(ctx context.Context) error {
functions, err := t.store.List(ctx)
if err != nil {
return fmt.Errorf("failed to list functions: %w", err)
}
for _, function := range functions {
if function.Spec.Database != v1.AllDatabases && function.Spec.Database != t.kustoCli.Database() {
continue
}
if !function.DeletionTimestamp.IsZero() {
// Until we can parse KQL we don't actually know the function's
// name as described in this CRD; however, we'll make the assumption
// that the CRD name is the same as the function name in Kusto and
// attempt a delete.
stmt := kql.New(".drop function ").AddUnsafe(function.Name).AddLiteral(" ifexists")
if _, err := t.kustoCli.Mgmt(ctx, stmt); err != nil {
logger.Errorf("Failed to delete function %s.%s: %v", function.Spec.Database, function.Name, err)
// Deletion is best-effort, especially while we still can't parse KQL
}
t.updateKQLFunctionStatus(ctx, function, v1.Success, nil)
return nil
}
// If endpoints have changed, or function is not in Success, re-apply
if t.kustoCli.Endpoint() != function.Spec.AppliedEndpoint || function.Status.Status != v1.Success || function.GetGeneration() != function.Status.ObservedGeneration {
stmt := kql.New(".execute database script with (ThrowOnErrors=true) <| ").AddUnsafe(function.Spec.Body)
if _, err := t.kustoCli.Mgmt(ctx, stmt); err != nil {
if !errors.Retry(err) {
logger.Errorf("Permanent failure to create function %s.%s: %v", function.Spec.Database, function.Name, err)
if err = t.updateKQLFunctionStatus(ctx, function, v1.PermanentFailure, err); err != nil {
logger.Errorf("Failed to update permanent failure status: %v", err)
}
continue
} else {
t.updateKQLFunctionStatus(ctx, function, v1.Failed, err)
logger.Warnf("Transient failure to create function %s.%s: %v", function.Spec.Database, function.Name, err)
continue
}
}
logger.Infof("Successfully created function %s.%s", function.Spec.Database, function.Name)
if t.kustoCli.Endpoint() != function.Spec.AppliedEndpoint {
function.Spec.AppliedEndpoint = t.kustoCli.Endpoint()
if err := t.store.Update(ctx, function); err != nil {
logger.Errorf("Failed to update function %s.%s: %v", function.Spec.Database, function.Name, err)
}
}
if err := t.updateKQLFunctionStatus(ctx, function, v1.Success, nil); err != nil {
logger.Errorf("Failed to update success status: %v", err)
}
}
}
return nil
}