backend/operations_scanner.go (651 lines of code) (raw):

// Copyright 2025 Microsoft Corporation // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "bytes" "context" "errors" "fmt" "log/slog" "net/http" "os" "slices" "strconv" "sync" "time" "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" ocmsdk "github.com/openshift-online/ocm-sdk-go" arohcpv1alpha1 "github.com/openshift-online/ocm-sdk-go/arohcp/v1alpha1" cmv1 "github.com/openshift-online/ocm-sdk-go/clustersmgmt/v1" ocmerrors "github.com/openshift-online/ocm-sdk-go/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/Azure/ARO-HCP/internal/api/arm" "github.com/Azure/ARO-HCP/internal/database" "github.com/Azure/ARO-HCP/internal/ocm" ) const ( defaultSubscriptionConcurrency = 10 defaultPollIntervalSubscriptions = 10 * time.Minute defaultPollIntervalOperations = 10 * time.Second collectSubscriptionsLabel = "list_subscriptions" processSubscriptionsLabel = "process_subscriptions" processOperationsLabel = "process_operations" pollClusterOperationLabel = "poll_cluster" pollNodePoolOperationLabel = "poll_node_pool" ) // Copied from uhc-clusters-service, because the // OCM SDK does not define this for some reason. type NodePoolStateValue string const ( NodePoolStateValidating NodePoolStateValue = "validating" NodePoolStatePending NodePoolStateValue = "pending" NodePoolStateInstalling NodePoolStateValue = "installing" NodePoolStateReady NodePoolStateValue = "ready" NodePoolStateUpdating NodePoolStateValue = "updating" NodePoolStateValidatingUpdate NodePoolStateValue = "validating_update" NodePoolStatePendingUpdate NodePoolStateValue = "pending_update" NodePoolStateUninstalling NodePoolStateValue = "uninstalling" NodePoolStateRecoverableError NodePoolStateValue = "recoverable_error" NodePoolStateError NodePoolStateValue = "error" ) const ( InflightChecksFailedProvisionErrorCode = "OCM4001" ) type operation struct { id string pk azcosmos.PartitionKey doc *database.OperationDocument logger *slog.Logger } type OperationsScanner struct { dbClient database.DBClient lockClient *database.LockClient clusterService ocm.ClusterServiceClient notificationClient *http.Client subscriptionsLock sync.Mutex subscriptions []string subscriptionChannel chan string subscriptionWorkers sync.WaitGroup leaderGauge prometheus.Gauge workerGauge prometheus.Gauge operationsCount *prometheus.CounterVec operationsFailedCount *prometheus.CounterVec operationsDuration *prometheus.HistogramVec lastOperationTimestamp *prometheus.GaugeVec subscriptionsByState *prometheus.GaugeVec } func NewOperationsScanner(dbClient database.DBClient, ocmConnection *ocmsdk.Connection) *OperationsScanner { s := &OperationsScanner{ dbClient: dbClient, lockClient: dbClient.GetLockClient(), clusterService: ocm.ClusterServiceClient{Conn: ocmConnection}, notificationClient: http.DefaultClient, subscriptions: make([]string, 0), leaderGauge: promauto.With(prometheus.DefaultRegisterer).NewGauge( prometheus.GaugeOpts{ Name: "backend_leader_election_state", Help: "Leader election state (1 when leader).", }, ), workerGauge: promauto.With(prometheus.DefaultRegisterer).NewGauge( prometheus.GaugeOpts{ Name: "backend_workers", Help: "Number of concurrent workers.", }, ), operationsCount: promauto.With(prometheus.DefaultRegisterer).NewCounterVec( prometheus.CounterOpts{ Name: "backend_operations_total", Help: "Total count of operations.", }, []string{"type"}, ), operationsFailedCount: promauto.With(prometheus.DefaultRegisterer).NewCounterVec( prometheus.CounterOpts{ Name: "backend_failed_operations_total", Help: "Total count of failed operations.", }, []string{"type"}, ), operationsDuration: promauto.With(prometheus.DefaultRegisterer).NewHistogramVec( prometheus.HistogramOpts{ Name: "backend_operations_duration_seconds", Help: "Histogram of operation latencies.", Buckets: []float64{.25, .5, 1, 2, 5}, NativeHistogramBucketFactor: 1.1, NativeHistogramMaxBucketNumber: 100, NativeHistogramMinResetDuration: 1 * time.Hour, }, []string{"type"}, ), lastOperationTimestamp: promauto.With(prometheus.DefaultRegisterer).NewGaugeVec( prometheus.GaugeOpts{ Name: "backend_last_operation_timestamp_seconds", Help: "Timestamp of the last operation.", }, []string{"type"}, ), subscriptionsByState: promauto.With(prometheus.DefaultRegisterer).NewGaugeVec( prometheus.GaugeOpts{ Name: "backend_subscriptions", Help: "Number of subscriptions by state.", }, []string{"state"}, ), } // Initialize the counter and histogram metrics. for _, v := range []string{ collectSubscriptionsLabel, processSubscriptionsLabel, processOperationsLabel, pollClusterOperationLabel, pollNodePoolOperationLabel, } { s.operationsCount.WithLabelValues(v) s.operationsFailedCount.WithLabelValues(v) s.operationsDuration.WithLabelValues(v) s.lastOperationTimestamp.WithLabelValues(v) } for subscriptionState := range arm.ListSubscriptionStates() { s.subscriptionsByState.WithLabelValues(string(subscriptionState)) } return s } // getInterval parses an environment variable into a time.Duration value. // If the environment variable is not defined or its value is invalid, // getInternal returns defaultVal. func getInterval(envName string, defaultVal time.Duration, logger *slog.Logger) time.Duration { if intervalString, ok := os.LookupEnv(envName); ok { interval, err := time.ParseDuration(intervalString) if err == nil { return interval } else { logger.Warn(fmt.Sprintf("Cannot use %s: %v", envName, err.Error())) } } return defaultVal } // getPositiveInt parses an environment variable into a positive integer. // If the environment variable is not defined or its value is invalid, // getPositiveInt returns defaultVal. func getPositiveInt(envName string, defaultVal int, logger *slog.Logger) int { if intString, ok := os.LookupEnv(envName); ok { positiveInt, err := strconv.Atoi(intString) if err == nil && positiveInt <= 0 { err = errors.New("value must be positive") } if err == nil { return positiveInt } logger.Warn(fmt.Sprintf("Cannot use %s: %v", envName, err.Error())) } return defaultVal } // Run executes the main loop of the OperationsScanner. func (s *OperationsScanner) Run(ctx context.Context, logger *slog.Logger) { var interval time.Duration interval = getInterval("BACKEND_POLL_INTERVAL_SUBSCRIPTIONS", defaultPollIntervalSubscriptions, logger) logger.Info("Polling subscriptions in Cosmos DB every " + interval.String()) collectSubscriptionsTicker := time.NewTicker(interval) interval = getInterval("BACKEND_POLL_INTERVAL_OPERATIONS", defaultPollIntervalOperations, logger) logger.Info("Polling operations in Cosmos DB every " + interval.String()) processSubscriptionsTicker := time.NewTicker(interval) numWorkers := getPositiveInt("BACKEND_SUBSCRIPTION_CONCURRENCY", defaultSubscriptionConcurrency, logger) logger.Info(fmt.Sprintf("Processing %d subscriptions at a time", numWorkers)) s.workerGauge.Set(float64(numWorkers)) // Create a buffered channel using worker pool size as a heuristic. s.subscriptionChannel = make(chan string, numWorkers) defer close(s.subscriptionChannel) // In this worker pool, each worker processes all operations within // a single Azure subscription / Cosmos DB partition. for i := 0; i < numWorkers; i++ { go func() { defer s.subscriptionWorkers.Done() for subscriptionID := range s.subscriptionChannel { subscriptionLogger := logger.With("subscription_id", subscriptionID) s.withSubscriptionLock(ctx, subscriptionLogger, subscriptionID, func(ctx context.Context) { s.processOperations(ctx, subscriptionID, subscriptionLogger) }) } }() } s.subscriptionWorkers.Add(numWorkers) // Collect subscriptions immediately on startup. s.collectSubscriptions(ctx, logger) loop: for { select { case <-collectSubscriptionsTicker.C: s.collectSubscriptions(ctx, logger) case <-processSubscriptionsTicker.C: s.processSubscriptions(logger) case <-ctx.Done(): // break alone just breaks out of select. // Use a label to break out of the loop. break loop } } } // Join waits for the OperationsScanner to gracefully shut down. func (s *OperationsScanner) Join() { s.subscriptionWorkers.Wait() s.leaderGauge.Set(0) } func (s *OperationsScanner) updateOperationMetrics(label string) func() { startTime := time.Now() s.operationsCount.WithLabelValues(label).Inc() return func() { s.operationsDuration.WithLabelValues(label).Observe(time.Since(startTime).Seconds()) s.lastOperationTimestamp.WithLabelValues(label).SetToCurrentTime() } } // collectSubscriptions builds an internal list of Azure subscription IDs by // querying Cosmos DB. func (s *OperationsScanner) collectSubscriptions(ctx context.Context, logger *slog.Logger) { defer s.updateOperationMetrics(collectSubscriptionsLabel)() var subscriptions []string iterator := s.dbClient.ListAllSubscriptionDocs() subscriptionStates := map[arm.SubscriptionState]int{} for subscriptionState := range arm.ListSubscriptionStates() { subscriptionStates[subscriptionState] = 0 } for subscriptionID, subscription := range iterator.Items(ctx) { // Unregistered subscriptions should have no active operations, // not even deletes. if subscription.State != arm.SubscriptionStateUnregistered { subscriptions = append(subscriptions, subscriptionID) } subscriptionStates[subscription.State]++ } err := iterator.GetError() if err != nil { s.operationsFailedCount.WithLabelValues(collectSubscriptionsLabel).Inc() logger.Error(fmt.Sprintf("Error while paging through Cosmos query results: %v", err.Error())) return } s.subscriptionsLock.Lock() defer s.subscriptionsLock.Unlock() if len(subscriptions) != len(s.subscriptions) { logger.Info(fmt.Sprintf("Tracking %d active subscriptions", len(subscriptions))) } for k, v := range subscriptionStates { s.subscriptionsByState.WithLabelValues(string(k)).Set(float64(v)) } s.subscriptions = subscriptions } // processSubscriptions feeds the internal list of Azure subscription IDs // to the worker pool for processing. processSubscriptions may block if the // worker pool gets overloaded. The log will indicate if this occurs. func (s *OperationsScanner) processSubscriptions(logger *slog.Logger) { defer s.updateOperationMetrics(processSubscriptionsLabel)() // This method may block while feeding subscription IDs to the // worker pool, so take a clone of the subscriptions slice to // iterate over. s.subscriptionsLock.Lock() subscriptions := slices.Clone(s.subscriptions) s.subscriptionsLock.Unlock() for _, subscriptionID := range subscriptions { select { case s.subscriptionChannel <- subscriptionID: default: // The channel is full. Push the subscription anyway // but log how long we block for. This will indicate // when the worker pool size needs increased. start := time.Now() s.subscriptionChannel <- subscriptionID logger.Warn(fmt.Sprintf("Subscription processing blocked for %s", time.Since(start))) } } } // processOperations processes all operations in a single Azure subscription. func (s *OperationsScanner) processOperations(ctx context.Context, subscriptionID string, logger *slog.Logger) { defer s.updateOperationMetrics(processOperationsLabel)() pk := database.NewPartitionKey(subscriptionID) iterator := s.dbClient.ListActiveOperationDocs(pk, nil) for operationID, operationDoc := range iterator.Items(ctx) { operationLogger := logger.With( "operation", operationDoc.Request, "operation_id", operationID, "resource_id", operationDoc.ExternalID.String(), "internal_id", operationDoc.InternalID.String()) op := operation{operationID, pk, operationDoc, operationLogger} switch operationDoc.InternalID.Kind() { case arohcpv1alpha1.ClusterKind: switch operationDoc.Request { case database.OperationRequestRevokeCredentials: s.pollBreakGlassCredentialRevoke(ctx, op) default: s.pollClusterOperation(ctx, op) } case arohcpv1alpha1.NodePoolKind: s.pollNodePoolOperation(ctx, op) case cmv1.BreakGlassCredentialKind: s.pollBreakGlassCredential(ctx, op) } } err := iterator.GetError() if err != nil { s.operationsFailedCount.WithLabelValues(processOperationsLabel).Inc() logger.Error(fmt.Sprintf("Error while paging through Cosmos query results: %v", err.Error())) } } // pollClusterOperation updates the status of a cluster operation. func (s *OperationsScanner) pollClusterOperation(ctx context.Context, op operation) { defer s.updateOperationMetrics(pollClusterOperationLabel)() clusterStatus, err := s.clusterService.GetClusterStatus(ctx, op.doc.InternalID) if err != nil { var ocmError *ocmerrors.Error if errors.As(err, &ocmError) && ocmError.Status() == http.StatusNotFound && op.doc.Request == database.OperationRequestDelete { err = s.setDeleteOperationAsCompleted(ctx, op) if err != nil { op.logger.Error(fmt.Sprintf("Failed to handle a completed deletion: %v", err)) } } else { op.logger.Error(fmt.Sprintf("Failed to get cluster status: %v", err)) } s.operationsFailedCount.WithLabelValues(pollClusterOperationLabel).Inc() return } opStatus, opError, err := s.convertClusterStatus(ctx, op.logger, clusterStatus, op.doc.Status, op.doc.InternalID) if err != nil { s.operationsFailedCount.WithLabelValues(pollClusterOperationLabel).Inc() op.logger.Warn(err.Error()) return } err = s.updateOperationStatus(ctx, op, opStatus, opError) if err != nil { s.operationsFailedCount.WithLabelValues(pollClusterOperationLabel).Inc() op.logger.Error(fmt.Sprintf("Failed to update operation status: %v", err)) } } // pollNodePoolOperation updates the status of a node pool operation. func (s *OperationsScanner) pollNodePoolOperation(ctx context.Context, op operation) { defer s.updateOperationMetrics(pollNodePoolOperationLabel)() nodePoolStatus, err := s.clusterService.GetNodePoolStatus(ctx, op.doc.InternalID) if err != nil { var ocmError *ocmerrors.Error if errors.As(err, &ocmError) && ocmError.Status() == http.StatusNotFound && op.doc.Request == database.OperationRequestDelete { err = s.setDeleteOperationAsCompleted(ctx, op) if err != nil { op.logger.Error(fmt.Sprintf("Failed to handle a completed deletion: %v", err)) } } else { op.logger.Error(fmt.Sprintf("Failed to get node pool status: %v", err)) } s.operationsFailedCount.WithLabelValues(pollNodePoolOperationLabel).Inc() return } opStatus, opError, err := convertNodePoolStatus(nodePoolStatus, op.doc.Status) if err != nil { s.operationsFailedCount.WithLabelValues(pollNodePoolOperationLabel).Inc() op.logger.Warn(err.Error()) return } err = s.updateOperationStatus(ctx, op, opStatus, opError) if err != nil { s.operationsFailedCount.WithLabelValues(pollNodePoolOperationLabel).Inc() op.logger.Error(fmt.Sprintf("Failed to update operation status: %v", err)) } } // pollBreakGlassCredential updates the status of a credential creation operation. func (s *OperationsScanner) pollBreakGlassCredential(ctx context.Context, op operation) { breakGlassCredential, err := s.clusterService.GetBreakGlassCredential(ctx, op.doc.InternalID) if err != nil { op.logger.Error(fmt.Sprintf("Failed to get break-glass credential: %v", err)) return } var opStatus = op.doc.Status var opError *arm.CloudErrorBody switch status := breakGlassCredential.Status(); status { case cmv1.BreakGlassCredentialStatusCreated: opStatus = arm.ProvisioningStateProvisioning case cmv1.BreakGlassCredentialStatusFailed: // XXX Cluster Service does not provide a reason for the failure, // so we have no choice but to use a generic error message. opStatus = arm.ProvisioningStateFailed opError = &arm.CloudErrorBody{ Code: arm.CloudErrorCodeInternalServerError, Message: "Failed to provision cluster credential", } case cmv1.BreakGlassCredentialStatusIssued: opStatus = arm.ProvisioningStateSucceeded default: op.logger.Error(fmt.Sprintf("Unhandled BreakGlassCredentialStatus '%s'", status)) return } updated, err := s.dbClient.UpdateOperationDoc(ctx, op.pk, op.id, func(updateDoc *database.OperationDocument) bool { return updateDoc.UpdateStatus(opStatus, opError) }) if err != nil { op.logger.Error(fmt.Sprintf("Failed to update operation status: %v", err)) } if updated { op.logger.Info(fmt.Sprintf("Updated status to '%s'", opStatus)) s.maybePostAsyncNotification(ctx, op) } } // pollBreakGlassCredentialRevoke updates the status of a credential revocation operation. func (s *OperationsScanner) pollBreakGlassCredentialRevoke(ctx context.Context, op operation) { var opStatus = arm.ProvisioningStateSucceeded var opError *arm.CloudErrorBody // XXX Error handling here is tricky. Since the operation applies to multiple // Cluster Service objects, we can find a mix of successes and failures. // And with only a Failed status for each object, it's difficult to make // intelligent decisions like whether to retry. This is just to say the // error handling policy here may need revising once Cluster Service // offers more detail to accompany BreakGlassCredentialStatusFailed. iterator := s.clusterService.ListBreakGlassCredentials(op.doc.InternalID, "") loop: for breakGlassCredential := range iterator.Items(ctx) { // An expired credential is as good as a revoked credential // for this operation, regardless of the credential status. if breakGlassCredential.ExpirationTimestamp().After(time.Now()) { switch status := breakGlassCredential.Status(); status { case cmv1.BreakGlassCredentialStatusAwaitingRevocation: opStatus = arm.ProvisioningStateDeleting // break alone just breaks out of select. // Use a label to break out of the loop. break loop case cmv1.BreakGlassCredentialStatusRevoked: // maintain ProvisioningStateSucceeded case cmv1.BreakGlassCredentialStatusFailed: // XXX Cluster Service does not provide a reason for the failure, // so we have no choice but to use a generic error message. opStatus = arm.ProvisioningStateFailed opError = &arm.CloudErrorBody{ Code: arm.CloudErrorCodeInternalServerError, Message: "Failed to revoke cluster credential", } // break alone just breaks out of select. // Use a label to break out of the loop. break loop default: op.logger.Error(fmt.Sprintf("Unhandled BreakGlassCredentialStatus '%s'", status)) } } } err := iterator.GetError() if err != nil { op.logger.Error(fmt.Sprintf("Error while paging through Cluster Service query results: %v", err.Error())) return } updated, err := s.dbClient.UpdateOperationDoc(ctx, op.pk, op.id, func(updateDoc *database.OperationDocument) bool { return updateDoc.UpdateStatus(opStatus, opError) }) if err != nil { op.logger.Error(fmt.Sprintf("Failed to update operation status: %v", err)) } if updated { op.logger.Info(fmt.Sprintf("Updated status to '%s'", opStatus)) s.maybePostAsyncNotification(ctx, op) } } // withSubscriptionLock holds a subscription lock while executing the given function. // In the event the subscription lock is lost, the context passed to the function will // be canceled. func (s *OperationsScanner) withSubscriptionLock(ctx context.Context, logger *slog.Logger, subscriptionID string, fn func(ctx context.Context)) { timeout := s.lockClient.GetDefaultTimeToLive() lock, err := s.lockClient.AcquireLock(ctx, subscriptionID, &timeout) if err != nil { logger.Error(fmt.Sprintf("Failed to acquire lock: %v", err)) return } lockedCtx, stop := s.lockClient.HoldLock(ctx, lock) fn(lockedCtx) lock = stop() if lock != nil { nonFatalErr := s.lockClient.ReleaseLock(ctx, lock) if nonFatalErr != nil { // Failure here is non-fatal but still log the error. // The lock's TTL ensures it will be released eventually. logger.Warn(fmt.Sprintf("Failed to release lock: %v", nonFatalErr)) } } } // setDeleteOperationAsCompleted updates Cosmos DB to reflect a completed resource deletion. func (s *OperationsScanner) setDeleteOperationAsCompleted(ctx context.Context, op operation) error { err := s.dbClient.DeleteResourceDoc(ctx, op.doc.ExternalID) if err != nil { return err } // Save a final "succeeded" operation status until TTL expires. const opStatus arm.ProvisioningState = arm.ProvisioningStateSucceeded updated, err := s.dbClient.UpdateOperationDoc(ctx, op.pk, op.id, func(updateDoc *database.OperationDocument) bool { return updateDoc.UpdateStatus(opStatus, nil) }) if err != nil { return err } if updated { op.logger.Info("Deletion completed") s.maybePostAsyncNotification(ctx, op) } return nil } // updateOperationStatus updates Cosmos DB to reflect an updated resource status. func (s *OperationsScanner) updateOperationStatus(ctx context.Context, op operation, opStatus arm.ProvisioningState, opError *arm.CloudErrorBody) error { updated, err := s.dbClient.UpdateOperationDoc(ctx, op.pk, op.id, func(updateDoc *database.OperationDocument) bool { return updateDoc.UpdateStatus(opStatus, opError) }) if err != nil { return err } if updated { op.logger.Info(fmt.Sprintf("Updated status to '%s'", opStatus)) s.maybePostAsyncNotification(ctx, op) } _, err = s.dbClient.UpdateResourceDoc(ctx, op.doc.ExternalID, func(updateDoc *database.ResourceDocument) bool { var updated bool if op.id == updateDoc.ActiveOperationID { if opStatus != updateDoc.ProvisioningState { updateDoc.ProvisioningState = opStatus updated = true } if opStatus.IsTerminal() { updateDoc.ActiveOperationID = "" updated = true } } return updated }) if err != nil { return err } return nil } // maybePostAsyncNotification attempts to notify ARM of a completed asynchronous // operation if the initial request included an "Azure-AsyncNotificationUri" header. func (s *OperationsScanner) maybePostAsyncNotification(ctx context.Context, op operation) { if len(op.doc.NotificationURI) > 0 { err := s.postAsyncNotification(ctx, op) if err == nil { op.logger.Info("Posted async notification") } else { op.logger.Error(fmt.Sprintf("Failed to post async notification: %v", err.Error())) } } } // postAsyncNotification submits an POST request with status payload to the given URL. func (s *OperationsScanner) postAsyncNotification(ctx context.Context, op operation) error { // Refetch the operation document to provide the latest status. doc, err := s.dbClient.GetOperationDoc(ctx, op.pk, op.id) if err != nil { return err } data, err := arm.MarshalJSON(doc.ToStatus()) if err != nil { return err } request, err := http.NewRequestWithContext(ctx, http.MethodPost, doc.NotificationURI, bytes.NewBuffer(data)) if err != nil { return err } request.Header.Set("Content-Type", "application/json") response, err := s.notificationClient.Do(request) if err != nil { return err } defer response.Body.Close() if response.StatusCode >= 400 { return errors.New(response.Status) } return nil } // convertClusterStatus attempts to translate a ClusterStatus object from // Cluster Service into an ARM provisioning state and, if necessary, a // structured OData error. func (s *OperationsScanner) convertClusterStatus(ctx context.Context, logger *slog.Logger, clusterStatus *arohcpv1alpha1.ClusterStatus, current arm.ProvisioningState, internalId ocm.InternalID) (arm.ProvisioningState, *arm.CloudErrorBody, error) { var opStatus = current var opError *arm.CloudErrorBody var err error switch state := clusterStatus.State(); state { case arohcpv1alpha1.ClusterStateError: opStatus = arm.ProvisioningStateFailed // Provision error codes are defined in the CS repo: // https://gitlab.cee.redhat.com/service/uhc-clusters-service/-/blob/master/pkg/api/cluster_errors.go code := clusterStatus.ProvisionErrorCode() if code == "" { code = arm.CloudErrorCodeInternalServerError } message := clusterStatus.ProvisionErrorMessage() if message == "" { message = clusterStatus.Description() } // Construct the cloud error code depending on the provision error code. switch code { case InflightChecksFailedProvisionErrorCode: opError, err = s.convertInflightChecks(ctx, logger, internalId) if err != nil { return opStatus, opError, err } default: opError = &arm.CloudErrorBody{Code: code, Message: message} } case arohcpv1alpha1.ClusterStateInstalling: opStatus = arm.ProvisioningStateProvisioning case arohcpv1alpha1.ClusterStateReady: opStatus = arm.ProvisioningStateSucceeded case arohcpv1alpha1.ClusterStateUninstalling: opStatus = arm.ProvisioningStateDeleting case arohcpv1alpha1.ClusterStatePending, arohcpv1alpha1.ClusterStateValidating: // These are valid cluster states for ARO-HCP but there are // no unique ProvisioningState values for them. They should // only occur when ProvisioningState is Accepted. if current != arm.ProvisioningStateAccepted { err = fmt.Errorf("got ClusterState '%s' while ProvisioningState was '%s' instead of '%s'", state, current, arm.ProvisioningStateAccepted) } default: err = fmt.Errorf("unhandled ClusterState '%s'", state) } return opStatus, opError, err } // convertNodePoolStatus attempts to translate a NodePoolStatus object // from Cluster Service into an ARM provisioning state and, if necessary, // a structured OData error. func convertNodePoolStatus(nodePoolStatus *arohcpv1alpha1.NodePoolStatus, current arm.ProvisioningState) (arm.ProvisioningState, *arm.CloudErrorBody, error) { var opStatus = current var opError *arm.CloudErrorBody var err error switch state := NodePoolStateValue(nodePoolStatus.State().NodePoolStateValue()); state { case NodePoolStateValidating, NodePoolStatePending, NodePoolStateValidatingUpdate, NodePoolStatePendingUpdate: // These are valid node pool states for ARO-HCP but there are // no unique ProvisioningState values for them. They should // only occur when ProvisioningState is Accepted. if current != arm.ProvisioningStateAccepted { err = fmt.Errorf("got NodePoolStatusValue '%s' while ProvisioningState was '%s' instead of '%s'", state, current, arm.ProvisioningStateAccepted) } case NodePoolStateInstalling: opStatus = arm.ProvisioningStateProvisioning case NodePoolStateReady: opStatus = arm.ProvisioningStateSucceeded case NodePoolStateUpdating: opStatus = arm.ProvisioningStateUpdating case NodePoolStateUninstalling: opStatus = arm.ProvisioningStateDeleting case NodePoolStateRecoverableError, NodePoolStateError: // XXX OCM SDK offers no error code or message for failed node pool // operations so "Internal Server Error" is all we can do for now. // https://issues.redhat.com/browse/ARO-14969 opStatus = arm.ProvisioningStateFailed opError = arm.NewInternalServerError().CloudErrorBody default: err = fmt.Errorf("unhandled NodePoolState '%s'", state) } return opStatus, opError, err } // convertInflightChecks gets a cluster internal ID, fetches inflight check errors from CS endpoint, and converts them // to arm.CloudErrorBody type. // The function should be triggered only if inflight errors occurred with provision error code OCM4001. func (s *OperationsScanner) convertInflightChecks(ctx context.Context, logger *slog.Logger, internalId ocm.InternalID) (*arm.CloudErrorBody, error) { inflightChecks, err := s.clusterService.GetClusterInflightChecks(ctx, internalId) if err != nil { return &arm.CloudErrorBody{}, err } var cloudErrors []arm.CloudErrorBody for _, inflightCheck := range inflightChecks.Items() { if inflightCheck.State() == arohcpv1alpha1.InflightCheckStateFailed { cloudErrors = append(cloudErrors, convertInflightCheck(inflightCheck, logger)) } } // This is a fallback case and should not normally occur. If the provision error code is OCM4001, // there should be at least one inflight failure. if len(cloudErrors) == 0 { logger.Error(fmt.Sprintf( "Cluster '%s' returned error code OCM4001, but no inflight failures were found", internalId)) return &arm.CloudErrorBody{ Code: arm.CloudErrorCodeInternalServerError, }, nil } if len(cloudErrors) == 1 { return &arm.CloudErrorBody{ Code: cloudErrors[0].Code, Message: cloudErrors[0].Message, }, nil } return &arm.CloudErrorBody{ Code: arm.CloudErrorCodeMultipleErrorsOccurred, Message: "Content validation failed on multiple fields", Details: cloudErrors, }, nil } func convertInflightCheck(inflightCheck *arohcpv1alpha1.InflightCheck, logger *slog.Logger) arm.CloudErrorBody { message, succeeded := convertInflightCheckDetails(inflightCheck) if !succeeded { logger.Error(fmt.Sprintf("error converting inflight check '%s' details", inflightCheck.Name())) } return arm.CloudErrorBody{ Code: arm.CloudErrorCodeInternalServerError, Message: message, } } // convertInflightCheckDetails gets an inflight check object and extracts the error message. func convertInflightCheckDetails(inflightCheck *arohcpv1alpha1.InflightCheck) (string, bool) { details, ok := inflightCheck.GetDetails() if !ok { return "", false } detailsMap, ok := details.(map[string]interface{}) if !ok { return "", false } // Retrieve "error" key safely if errMsg, exists := detailsMap["error"]; exists { if errStr, ok := errMsg.(string); ok { return errStr, true } } return "", false }