pkg/frontend/frontend.go (431 lines of code) (raw):

package frontend // Copyright (c) Microsoft Corporation. // Licensed under the Apache License 2.0. import ( "context" "crypto/tls" "fmt" "log" "net" "net/http" "strings" "sync" "sync/atomic" "time" "github.com/Azure/go-autorest/autorest/azure" "github.com/go-chi/chi/v5" chiMiddlewares "github.com/go-chi/chi/v5/middleware" "github.com/sirupsen/logrus" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" "github.com/Azure/ARO-RP/pkg/api" "github.com/Azure/ARO-RP/pkg/database" "github.com/Azure/ARO-RP/pkg/env" "github.com/Azure/ARO-RP/pkg/frontend/adminactions" "github.com/Azure/ARO-RP/pkg/frontend/middleware" "github.com/Azure/ARO-RP/pkg/hive" "github.com/Azure/ARO-RP/pkg/metrics" "github.com/Azure/ARO-RP/pkg/util/azureclient/azuresdk/azsecrets" "github.com/Azure/ARO-RP/pkg/util/bucket" "github.com/Azure/ARO-RP/pkg/util/clusterdata" "github.com/Azure/ARO-RP/pkg/util/encryption" "github.com/Azure/ARO-RP/pkg/util/heartbeat" utillog "github.com/Azure/ARO-RP/pkg/util/log" "github.com/Azure/ARO-RP/pkg/util/log/audit" "github.com/Azure/ARO-RP/pkg/util/recover" ) type statusCodeError int func (err statusCodeError) Error() string { return fmt.Sprintf("%d", err) } type frontendDBs interface { database.DatabaseGroupWithAsyncOperations database.DatabaseGroupWithOpenShiftVersions database.DatabaseGroupWithOpenShiftClusters database.DatabaseGroupWithAsyncOperations database.DatabaseGroupWithSubscriptions database.DatabaseGroupWithPlatformWorkloadIdentityRoleSets database.DatabaseGroupWithMaintenanceManifests } type kubeActionsFactory func(*logrus.Entry, env.Interface, *api.OpenShiftCluster) (adminactions.KubeActions, error) type azureActionsFactory func(*logrus.Entry, env.Interface, *api.OpenShiftCluster, *api.SubscriptionDocument) (adminactions.AzureActions, error) type appLensActionsFactory func(*logrus.Entry, env.Interface, *api.OpenShiftCluster, *api.SubscriptionDocument) (adminactions.AppLensActions, error) type frontend struct { auditLog *logrus.Entry baseLog *logrus.Entry env env.Interface logMiddleware middleware.LogMiddleware validateMiddleware middleware.ValidateMiddleware m middleware.MetricsMiddleware authMiddleware middleware.AuthMiddleware apiVersionMiddleware middleware.ApiVersionValidator maintenanceMiddleware middleware.MaintenanceMiddleware dbGroup frontendDBs defaultOcpVersion string // always enabled enabledOcpVersions map[string]*api.OpenShiftVersion availablePlatformWorkloadIdentityRoleSets map[string]*api.PlatformWorkloadIdentityRoleSet apis map[string]*api.Version lastOcpVersionsChangefeed atomic.Value //time.Time lastPlatformWorkloadIdentityRoleSetsChangefeed atomic.Value ocpVersionsMu sync.RWMutex platformWorkloadIdentityRoleSetsMu sync.RWMutex aead encryption.AEAD hiveClusterManager hive.ClusterManager hiveSyncSetManager hive.SyncSetManager kubeActionsFactory kubeActionsFactory azureActionsFactory azureActionsFactory appLensActionsFactory appLensActionsFactory skuValidator SkuValidator quotaValidator QuotaValidator providersValidator ProvidersValidator clusterEnricher clusterdata.BestEffortEnricher l net.Listener s *http.Server bucketAllocator bucket.Allocator startTime time.Time ready atomic.Value // these helps us to test and mock easier now func() time.Time systemDataClusterDocEnricher func(*api.OpenShiftClusterDocument, *api.SystemData) streamResponder StreamResponder } // Runnable represents a runnable object type Runnable interface { Run(context.Context, <-chan struct{}, chan<- struct{}) } // TODO: Get the number of function parameters under control :D // NewFrontend returns a new runnable frontend func NewFrontend(ctx context.Context, auditLog *logrus.Entry, baseLog *logrus.Entry, outelAuditClient audit.Client, _env env.Interface, dbGroup frontendDBs, apis map[string]*api.Version, m metrics.Emitter, clusterm metrics.Emitter, aead encryption.AEAD, hiveClusterManager hive.ClusterManager, hiveSyncSetManager hive.SyncSetManager, kubeActionsFactory kubeActionsFactory, azureActionsFactory azureActionsFactory, appLensActionsFactory appLensActionsFactory, enricher clusterdata.BestEffortEnricher, ) (*frontend, error) { f := &frontend{ logMiddleware: middleware.LogMiddleware{ EnvironmentName: _env.Environment().Name, Location: _env.Location(), Hostname: _env.Hostname(), BaseLog: baseLog.WithField("component", "access"), AuditLog: auditLog, OutelAuditClient: outelAuditClient, }, baseLog: baseLog, auditLog: auditLog, env: _env, apiVersionMiddleware: middleware.ApiVersionValidator{ APIs: api.APIs, }, validateMiddleware: middleware.ValidateMiddleware{ Location: _env.Location(), Apis: api.APIs, }, authMiddleware: middleware.AuthMiddleware{ Log: baseLog, EnableMISE: _env.FeatureIsSet(env.FeatureEnableMISE), EnforceMISE: _env.FeatureIsSet(env.FeatureEnforceMISE), AdminAuth: _env.AdminClientAuthorizer(), ArmAuth: _env.ArmClientAuthorizer(), MiseAuth: _env.MISEAuthorizer(), }, dbGroup: dbGroup, apis: apis, m: middleware.MetricsMiddleware{Emitter: m}, maintenanceMiddleware: middleware.MaintenanceMiddleware{Emitter: clusterm}, aead: aead, hiveClusterManager: hiveClusterManager, hiveSyncSetManager: hiveSyncSetManager, kubeActionsFactory: kubeActionsFactory, azureActionsFactory: azureActionsFactory, appLensActionsFactory: appLensActionsFactory, quotaValidator: quotaValidator{}, skuValidator: skuValidator{}, providersValidator: providersValidator{}, clusterEnricher: enricher, enabledOcpVersions: map[string]*api.OpenShiftVersion{}, availablePlatformWorkloadIdentityRoleSets: map[string]*api.PlatformWorkloadIdentityRoleSet{}, bucketAllocator: &bucket.Random{}, startTime: time.Now(), now: time.Now, systemDataClusterDocEnricher: enrichClusterSystemData, streamResponder: defaultResponder{}, } l, err := f.env.Listen() if err != nil { return nil, err } certificate, err := f.env.ServiceKeyvault().GetSecret(ctx, env.RPServerSecretName, "", nil) if err != nil { return nil, err } key, certs, err := azsecrets.ParseSecretAsCertificate(certificate) if err != nil { return nil, err } config := &tls.Config{ Certificates: []tls.Certificate{ { PrivateKey: key, }, }, NextProtos: []string{"h2", "http/1.1"}, ClientAuth: tls.RequestClientCert, SessionTicketsDisabled: true, MinVersion: tls.VersionTLS12, CurvePreferences: []tls.CurveID{ tls.CurveP256, tls.X25519, }, } for _, cert := range certs { config.Certificates[0].Certificate = append(config.Certificates[0].Certificate, cert.Raw) } f.l = tls.NewListener(l, config) f.ready.Store(true) return f, nil } func (f *frontend) chiUnauthenticatedRoutes(router chi.Router) { router.Get("/healthz/ready", f.getReady) } func (f *frontend) chiAuthenticatedRoutes(router chi.Router) { r := router.With(f.authMiddleware.Authenticate) r.Route("/subscriptions/{subscriptionId}", func(r chi.Router) { r.Route("/resourcegroups/{resourceGroupName}/providers/{resourceProviderNamespace}/{resourceType}", func(r chi.Router) { r.With(f.apiVersionMiddleware.ValidateAPIVersion).Get("/", f.getOpenShiftClusters) r.Route("/{resourceName}", func(r chi.Router) { r.With(f.apiVersionMiddleware.ValidateAPIVersion).Route("/", func(r chi.Router) { r.Delete("/", f.deleteOpenShiftCluster) r.Get("/", f.getOpenShiftCluster) if f.env.IsLocalDevelopmentMode() { r.With(middleware.MockMSIMiddleware).Patch("/", f.putOrPatchOpenShiftCluster) r.With(middleware.MockMSIMiddleware).Put("/", f.putOrPatchOpenShiftCluster) } else { r.Patch("/", f.putOrPatchOpenShiftCluster) r.Put("/", f.putOrPatchOpenShiftCluster) } r.Post("/listcredentials", f.postOpenShiftClusterCredentials) r.Post("/listadmincredentials", f.postOpenShiftClusterKubeConfigCredentials) }) r.Get("/detectors", f.listAppLensDetectors) r.Get("/detectors/{detectorId}", f.getAppLensDetector) }) }) r.Route("/resourcegroups/{resourceGroupName}/providers/{resourceProviderNamespace}/deployments/{deploymentName}/preflight", func(r chi.Router) { r.Use(f.apiVersionMiddleware.ValidatePreflightAPIVersion) r.Post("/", f.preflightValidation) }) r.Route("/providers/{resourceProviderNamespace}", func(r chi.Router) { r.Use(f.apiVersionMiddleware.ValidateAPIVersion) r.Get("/{resourceType}", f.getOpenShiftClusters) r.Route("/locations/{location}", func(r chi.Router) { r.Get("/operationsstatus/{operationId}", f.getAsyncOperationsStatus) r.Get("/operationresults/{operationId}", f.getAsyncOperationResult) r.Get("/openshiftversions", f.listInstallVersions) r.Get("/openshiftversions/{openshiftVersion}", f.getInstallVersion) r.Get("/platformworkloadidentityrolesets", f.listPlatformWorkloadIdentityRoleSets) r.Get("/platformworkloadidentityrolesets/{openShiftMinorVersion}", f.getPlatformWorkloadIdentityRoleSet) }) }) }) //Admin Actions r.Route("/admin", func(r chi.Router) { r.Route("/versions", func(r chi.Router) { r.Get("/", f.getAdminOpenShiftVersions) r.Put("/", f.putAdminOpenShiftVersion) }) r.Route("/platformworkloadidentityrolesets", func(r chi.Router) { r.Get("/", f.getAdminPlatformWorkloadIdentityRoleSets) r.Put("/", f.putAdminPlatformWorkloadIdentityRoleSet) }) r.Get("/supportedvmsizes", f.supportedvmsizes) r.Route("/maintenancemanifests", func(r chi.Router) { r.Get("/queued", f.getAdminQueuedMaintManifests) }) r.Route("/hivesyncset", func(r chi.Router) { r.Get("/", f.listAdminHiveSyncSet) r.Get("/syncsetname/{syncsetname}", f.getAdminHiveSyncSet) }) r.Route("/subscriptions/{subscriptionId}", func(r chi.Router) { r.Route("/resourcegroups/{resourceGroupName}/providers/{resourceProviderNamespace}/{resourceType}/{resourceName}", func(r chi.Router) { // Top pods metrics endpoint r.Get("/top/pods", f.getAdminTopPods) // Top nodes metrics endpoint r.Get("/top/nodes", f.getAdminTopNodes) // Etcd recovery r.With(f.maintenanceMiddleware.UnplannedMaintenanceSignal).Post("/etcdrecovery", f.postAdminOpenShiftClusterEtcdRecovery) // Kubernetes objects r.Get("/kubernetesobjects", f.getAdminKubernetesObjects) r.With(f.maintenanceMiddleware.UnplannedMaintenanceSignal).Post("/kubernetesobjects", f.postAdminKubernetesObjects) r.With(f.maintenanceMiddleware.UnplannedMaintenanceSignal).Delete("/kubernetesobjects", f.deleteAdminKubernetesObjects) r.With(f.maintenanceMiddleware.UnplannedMaintenanceSignal).Post("/approvecsr", f.postAdminOpenShiftClusterApproveCSR) // Pod logs r.Get("/kubernetespodlogs", f.getAdminKubernetesPodLogs) r.Get("/resources", f.listAdminOpenShiftClusterResources) r.Get("/serialconsole", f.getAdminOpenShiftClusterSerialConsole) r.Get("/clusterdeployment", f.getAdminHiveClusterDeployment) r.Get("/clustersync", f.getAdminHiveClusterSync) r.With(f.maintenanceMiddleware.UnplannedMaintenanceSignal).Post("/redeployvm", f.postAdminOpenShiftClusterRedeployVM) r.With(f.maintenanceMiddleware.UnplannedMaintenanceSignal).Post("/stopvm", f.postAdminOpenShiftClusterStopVM) r.With(f.maintenanceMiddleware.UnplannedMaintenanceSignal).Post("/startvm", f.postAdminOpenShiftClusterStartVM) r.Get("/skus", f.getAdminOpenShiftClusterVMResizeOptions) // We don't emit unplanned maintenance signal for resize since it is only used for planned maintenance r.Post("/resize", f.postAdminOpenShiftClusterVMResize) r.With(f.maintenanceMiddleware.UnplannedMaintenanceSignal).Post("/reconcilefailednic", f.postAdminReconcileFailedNIC) r.With(f.maintenanceMiddleware.UnplannedMaintenanceSignal).Post("/cordonnode", f.postAdminOpenShiftClusterCordonNode) r.With(f.maintenanceMiddleware.UnplannedMaintenanceSignal).Post("/drainnode", f.postAdminOpenShiftClusterDrainNode) r.With(f.maintenanceMiddleware.UnplannedMaintenanceSignal).Post("/etcdcertificaterenew", f.postAdminOpenShiftClusterEtcdCertificateRenew) r.With(f.maintenanceMiddleware.UnplannedMaintenanceSignal).Post("/deletemanagedresource", f.postAdminOpenShiftDeleteManagedResource) // MIMO r.Route("/maintenancemanifests", func(r chi.Router) { r.Get("/", f.getAdminMaintManifests) r.Put("/", f.putAdminMaintManifestCreate) r.Route("/{manifestId}", func(r chi.Router) { r.Get("/", f.getSingleAdminMaintManifest) r.Delete("/", f.deleteAdminMaintManifest) r.Post("/cancel", f.postAdminMaintManifestCancel) }) }) }) }) // Operations r.Route("/providers/{resourceProviderNamespace}", func(r chi.Router) { r.Get("/{resourceType}", f.getAdminOpenShiftClusters) }) }) r.Put("/subscriptions/{subscriptionId}", f.putSubscription) r.With(f.apiVersionMiddleware.ValidateAPIVersion).Get("/providers/{resourceProviderNamespace}/operations", f.getOperations) } func notFound(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") api.WriteError(w, http.StatusNotFound, api.CloudErrorCodeNotFound, "", "The requested path could not be found.") } func (f *frontend) setupRouter() chi.Router { chiRouter := chi.NewMux() chiRouter.Use(chiMiddlewares.CleanPath) chiRouter.NotFound(f.authMiddleware.Authenticate(http.HandlerFunc(notFound)).ServeHTTP) registered := chiRouter.With( chiMiddlewares.CleanPath, f.logMiddleware.Log, f.m.Metrics, middleware.Panic, middleware.Headers, f.validateMiddleware.Validate, middleware.Body, middleware.SystemData) f.chiAuthenticatedRoutes(registered) f.chiUnauthenticatedRoutes(registered) return chiRouter } func (f *frontend) Run(ctx context.Context, stop <-chan struct{}, done chan<- struct{}) { defer recover.Panic(f.baseLog) go f.changefeedOcpVersions(ctx) go f.changefeedRoleSets(ctx) if stop != nil { go func() { defer recover.Panic(f.baseLog) <-stop if !f.env.FeatureIsSet(env.FeatureDisableReadinessDelay) { // mark not ready and wait for ((#probes + 1) * interval + longest // connection timeout + margin) to stop receiving new connections f.baseLog.Print("marking not ready and waiting 80 seconds") f.ready.Store(false) time.Sleep(80 * time.Second) } f.baseLog.Print("exiting") close(done) }() } f.s = &http.Server{ Handler: middleware.Lowercase(f.setupRouter()), ReadTimeout: 10 * time.Second, IdleTimeout: 2 * time.Minute, ErrorLog: log.New(f.baseLog.Writer(), "", 0), BaseContext: func(net.Listener) context.Context { return ctx }, } go heartbeat.EmitHeartbeat(f.baseLog, f.m, "frontend.heartbeat", stop, f.checkReady) err := f.s.Serve(f.l) if err != http.ErrServerClosed { f.baseLog.Error(err) } } func adminReply(log *logrus.Entry, w http.ResponseWriter, header http.Header, b []byte, err error) { if apiErr, ok := err.(kerrors.APIStatus); ok { status := apiErr.Status() var target string if status.Details != nil { gk := schema.GroupKind{ Group: status.Details.Group, Kind: status.Details.Kind, } target = fmt.Sprintf("%s/%s", gk, status.Details.Name) } err = &api.CloudError{ StatusCode: int(status.Code), CloudErrorBody: &api.CloudErrorBody{ Code: string(status.Reason), Message: status.Message, Target: target, }, } } reply(log, w, header, b, err) } func reply(log *logrus.Entry, w http.ResponseWriter, header http.Header, b []byte, err error) { for k, v := range header { w.Header()[k] = v } if err != nil { switch err := err.(type) { case *api.CloudError: log.Info(err) api.WriteCloudError(w, err) return case statusCodeError: w.WriteHeader(int(err)) default: log.Error(err) api.WriteError(w, http.StatusInternalServerError, api.CloudErrorCodeInternalServerError, "", "Internal server error.") return } } if b != nil { _, _ = w.Write(b) _, _ = w.Write([]byte{'\n'}) } } func frontendOperationResultLog(log *logrus.Entry, method string, err error) { log = log.WithFields(logrus.Fields{ "LOGKIND": "frontendqos", "resultType": utillog.SuccessResultType, "operationType": method, }) if err == nil { log.Info("front end operation succeeded") return } var statusCode int switch err := err.(type) { case *api.CloudError: statusCode = err.StatusCode case statusCodeError: statusCode = int(err) default: statusCode = 500 } resultType := utillog.MapStatusCodeToResultType(statusCode) log = log.WithField("resultType", resultType) if resultType == utillog.SuccessResultType { log.Info("front end operation succeeded") return } log = log.WithField("errorDetails", err.Error()) log.Info("front end operation failed") } // resourceIdFromURLParams returns an Azure Resource ID built out of the // individual parameters of the URL. func resourceIdFromURLParams(r *http.Request) string { subID, resType, resProvider, resName, resGroupName := chi.URLParam(r, "subscriptionId"), chi.URLParam(r, "resourceType"), chi.URLParam(r, "resourceProviderNamespace"), chi.URLParam(r, "resourceName"), chi.URLParam(r, "resourceGroupName") return strings.ToLower(azure.Resource{ SubscriptionID: subID, ResourceGroup: resGroupName, ResourceType: resType, ResourceName: resName, Provider: resProvider, }.String()) }