cmd/core_plugin/manager/manager.go (204 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package manager is the module manager, it wraps the initialization and // mass notification of core-plugin's modules. package manager import ( "context" "errors" "fmt" "maps" "sync" "time" "github.com/GoogleCloudPlatform/galog" ) // ModuleStage is the stage in which a module should be grouped in. type ModuleStage int // InitType is the type of initialization a module should be executed. type InitType int // ModuleStatus is the status of a module initialization. type ModuleStatus int const ( // EarlyStage represents the early stage of core-plugin execution, modules in // such stage are executed on behalf of early platform initialization, it's // assumed that the OS is still not yet fully compatible/configured with GCE // platform. EarlyStage ModuleStage = iota // LateStage represents the late(r) stage of core-plugin execution, it takes // off after the early stage has finished and we've notified Guest Agent. LateStage // BlockingInit represents the module initialization executed in a blocking // manner. BlockingInit InitType = iota // ConcurrentInit represents the module initialization executed in a // concurrent manner. ConcurrentInit // StatusSkipped represents a module initialization that was skipped. StatusSkipped ModuleStatus = iota // StatusFailed represents a module initialization that failed. StatusFailed // StatusSucceeded represents a module initialization that succeeded. StatusSucceeded ) var ( // modManager is the module manager instance. modManager = moduleManager{ modules: make(map[ModuleStage][]*Module), metrics: make(map[string]*ModuleMetric), } ) // moduleManager is the module manager's context structure. type moduleManager struct { // mux is a mutex to protect the modules map. mux sync.Mutex // modules is a map of modules registered for a given stage. modules map[ModuleStage][]*Module // metrics is a map of module metrics. metrics map[string]*ModuleMetric } // ModuleMetric contains the module's initialization metrics. type ModuleMetric struct { // Module is the module that was initialized. Module *Module // Stage is the stage in which the module was initialized. Stage ModuleStage // Start is the time the module initialization started. Start time.Time // End is the time the module initialization ended. End time.Time // Err is the error/status of the module initialization. Err error // Status is the status of the module initialization. Status ModuleStatus // InitType is the type of initialization the module was executed. InitType InitType } // Metrics returns/exposes the modules metrics. The returned map is a copy of // the internal map and is safe to be modified after returned (both internally // and externally). func Metrics() map[string]*ModuleMetric { modManager.mux.Lock() defer modManager.mux.Unlock() res := make(map[string]*ModuleMetric) maps.Copy(res, modManager.metrics) return res } // newModuleMetric creates a new module metric. func newModuleMetric(mod *Module, stage ModuleStage, initType InitType) *ModuleMetric { metric := &ModuleMetric{ Module: mod, Stage: stage, Start: time.Now(), InitType: initType, } modManager.metrics[mod.ID] = metric return metric } // finish records the module initialization finishing time and status. func (mm *ModuleMetric) finish(status ModuleStatus, err error) { mm.End = time.Now() mm.Err = err mm.Status = status } // Module is the configuration structure of a module. type Module struct { // Enabled is a flag to indicate if the module is enabled/disabled in the // local configuration. Enabled *bool // ID is a string representation of a module identification. ID string // Description is a string representation of a module description. Description string // Setup is the function to initialize the module. Modules implementing this // function will have its execution ran in parallel with other modules - every // module Setup will be executed in a different goroutine. Setup func(ctx context.Context, data any) error // BlockSetup is equivalent to Setup but the modules initialization will // happen in sequence, meaning, a module initialization blocks the // initialization of all the non-initialized modules. BlockSetup func(ctx context.Context, data any) error // Quit is the function implemented by the module to get notifications to // "nicely" quit, the manager will wait for these executions to finish. When // returned from Quit the module is communicating that it's fully done. Quit func(ctx context.Context) } // Display returns a nice string with id and description of the module and is // used to display the module in the list of modules. func (mod *Module) Display() string { desc := mod.Description if desc == "" { desc = "No description available" } ID := mod.ID if ID == "" { ID = "No ID available" } return fmt.Sprintf("%s: %s.", ID, desc) } // modulesLen returns the number of modules registered for a given stage. func modulesLen(stage ModuleStage) int { modManager.mux.Lock() defer modManager.mux.Unlock() return len(modManager.modules[stage]) } // Register registers modules in a execution/initialization stage. The order the // modules are registered is honored. func Register(mods []*Module, stage ModuleStage) { modManager.mux.Lock() defer modManager.mux.Unlock() for _, mod := range mods { if mod == nil { continue } if mod.Enabled != nil && *mod.Enabled == false { galog.Debugf("Module %q is disabled, skipping.", mod.ID) continue } modManager.modules[stage] = append(modManager.modules[stage], mod) } } // List returns the list of modules registered for a given stage. func List(stage ModuleStage) []*Module { modManager.mux.Lock() defer modManager.mux.Unlock() return modManager.modules[stage] } // NotifyQuit notifies modules registered for a stage that they should nicely // quit. func NotifyQuit(ctx context.Context, stage ModuleStage) { modManager.mux.Lock() defer modManager.mux.Unlock() for _, mod := range modManager.modules[stage] { if mod.Quit == nil { galog.Debugf("Module %q has no Quit function, skipping.", mod.ID) continue } mod.Quit(ctx) } } // RunBlocking runs all modules in a given stage in a blocking manner. The // selection of modules is based on the existence of the BlockSetup function // implementation. It returns an error wrapping all errors returned by the // modules. func RunBlocking(ctx context.Context, stage ModuleStage, data any) error { modManager.mux.Lock() defer modManager.mux.Unlock() for _, mod := range modManager.modules[stage] { metric := newModuleMetric(mod, stage, BlockingInit) if mod.BlockSetup == nil { galog.V(2).Debugf("Module %q has no BlockSetup function, skipping.", mod.ID) metric.finish(StatusSkipped, nil) continue } if err := mod.BlockSetup(ctx, data); err != nil { metric.finish(StatusFailed, err) return fmt.Errorf("failed to initialize module(%s): %w", mod.ID, err) } metric.finish(StatusSucceeded, nil) } return nil } // Errors is a collection of module initialization/setup errors. type Errors struct { // mu is a mutex to protect the Errors slice. mu sync.Mutex // reportedErrors is the list of module initialization errors. reportedErrors []*moduleError } // moduleError is a module initialization error. type moduleError struct { // module is the module that failed to initialize. module *Module // err is the error returned by the module. err error } // Each runs a function for each module initialization error. func (e *Errors) Each(fc func(moduleID string, err error)) { e.mu.Lock() defer e.mu.Unlock() for _, err := range e.reportedErrors { fc(err.module.ID, err.err) } } // add adds a module initialization error to the list. func (e *Errors) add(mod *Module, err error) { e.mu.Lock() defer e.mu.Unlock() e.reportedErrors = append(e.reportedErrors, &moduleError{module: mod, err: err}) } // len returns the number of module initialization errors. func (e *Errors) len() int { e.mu.Lock() defer e.mu.Unlock() return len(e.reportedErrors) } // join returns an error joining all module initialization errors. func (e *Errors) join() error { e.mu.Lock() defer e.mu.Unlock() var errs []error for _, err := range e.reportedErrors { errs = append(errs, err.err) } return errors.Join(errs...) } // RunConcurrent runs all modules in a given stage in parallel. The selection of // of modules is based on the existence of the Setup function implementation. It // returns an Errors wrapping all errors returned by the modules. func RunConcurrent(ctx context.Context, stage ModuleStage, data any) *Errors { modManager.mux.Lock() defer modManager.mux.Unlock() var wg sync.WaitGroup errors := &Errors{} for _, mod := range modManager.modules[stage] { metrics := newModuleMetric(mod, stage, ConcurrentInit) if mod.Setup == nil { galog.V(2).Debugf("Module %q has no Setup function, skipping.", mod.ID) metrics.finish(StatusSkipped, nil) continue } wg.Add(1) go func(metrics *ModuleMetric) { defer wg.Done() if err := mod.Setup(ctx, data); err != nil { errors.add(mod, err) metrics.finish(StatusFailed, err) return } metrics.finish(StatusSucceeded, nil) }(metrics) } wg.Wait() if errors.len() > 0 { return errors } return nil } // Shutdown shuts down the module manager. func Shutdown() { modManager.mux.Lock() defer modManager.mux.Unlock() modManager.modules = make(map[ModuleStage][]*Module) }