internal/plugin/manager/pluginmanager.go (569 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package manager
import (
"context"
"encoding/gob"
"errors"
"fmt"
"os"
"path/filepath"
"regexp"
"sync"
"sync/atomic"
"time"
"github.com/GoogleCloudPlatform/galog"
acpb "github.com/GoogleCloudPlatform/google-guest-agent/internal/acp/proto/google_guest_agent/acp"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/acs/client"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/boundedlist"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/ps"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/scheduler"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/utils/file"
"google.golang.org/protobuf/proto"
tpb "google.golang.org/protobuf/types/known/timestamppb"
)
const (
// agentStateDir is where all agent state is stored. This includes plugin
// information and other information agent might want to store.
agentStateDir = "agent_state"
// pluginInfoDir is where all plugin information is stored within agent state.
pluginInfoDir = "plugin_info"
// healthCheckFrequency is the frequency at which plugin health check is
// executed.
healthCheckFrequency = 10 * time.Second
// metricsCheckFrequency is the default frequency at which plugin metrics
// check is executed.
metricsCheckFrequency = 10 * time.Second
// maxMetricDatapoints is the maximum number of datapoints to be stored in the
// metric list in memory.
maxMetricDatapoints = 60
)
// pluginManager is the instance of plugin manager.
var pluginManager *PluginManager
// PluginManager struct represents the plugins that plugin manager manages.
type PluginManager struct {
// mu protects the plugins map.
mu sync.RWMutex
// plugins is the map of plugin name and plugin managed by plugin manager.
plugins map[string]*Plugin
// pluginMonitorMu protects the pluginMonitors map.
pluginMonitorMu sync.Mutex
// pluginMonitors is the map of plugin and plugin monitor ID monitoring
// plugin.
pluginMonitors map[string]string
// pluginMetricsMu protects the pluginMetrics map.
pluginMetricsMu sync.Mutex
// pluginMetricsMonitors is the map of plugin and plugin metrics ID monitoring
// plugin.
pluginMetricsMonitors map[string]string
// scheduler is the scheduler used by plugin manager to schedule plugins
// monitoring.
scheduler *scheduler.Scheduler
// protocol is the protocol used by plugin manager to communicate with all
// plugins.
protocol string
// pendingPluginRevisionsMu protects the pendingPluginRevisions map.
pendingPluginRevisionsMu sync.RWMutex
// inProgressPluginRequests keep tracks of the pending plugin requests. This
// allows us to ignore new requests if previous request is still in progress
// for the same plugin.
inProgressPluginRequests map[string]bool
// requestCountMu protects the requestCount map.
requestCountMu sync.Mutex
// requestCount keeps track of the number of requests received for each
// action and also the number of successful and failures requests.
requestCount map[acpb.ConfigurePluginStates_Action]map[bool]int
// instanceIDMu protects the instanceID.
instanceIDMu sync.Mutex
// instanceID is the instance ID of the VM plugin manager is running on.
instanceID string
// IsInitialized indicates if plugin manager is initialized.
IsInitialized atomic.Bool
}
// agentPluginState returns the path to the directory where agent maintains
// plugin state.
func agentPluginState() string {
return filepath.Join(baseState(), agentStateDir, pluginInfoDir)
}
// Instance returns the previously initialized instance of plugin manager.
func Instance() *PluginManager {
return pluginManager
}
func init() {
pluginManager = &PluginManager{
protocol: tcpProtocol,
pluginMonitors: make(map[string]string),
pluginMetricsMonitors: make(map[string]string),
scheduler: scheduler.Instance(),
inProgressPluginRequests: make(map[string]bool),
requestCount: make(map[acpb.ConfigurePluginStates_Action]map[bool]int),
}
}
func (m *PluginManager) cleanupOldState(ctx context.Context, path string) error {
re := regexp.MustCompile("^[0-9]+$")
if !file.Exists(path, file.TypeDir) {
// This is not an error, it just means there's nothing to clean up, which
// can happen if the agent is started for the first time or plugins were
// never installed.
galog.Debugf("Plugin state directory %q does not exist, skipping cleanup", path)
return nil
}
dirs, err := os.ReadDir(path)
if err != nil {
return fmt.Errorf("failed to read directory %q: %w", path, err)
}
currentID := m.currentInstanceID()
for _, dir := range dirs {
absPath := filepath.Join(path, dir.Name())
// Skip the current instance directory and any non-numeric directories,
// these are most likely not agent created.
if !dir.IsDir() || dir.Name() == currentID || !re.MatchString(dir.Name()) {
galog.V(2).Debugf("Skipping %q from plugin manager old state cleanup", absPath)
continue
}
galog.Debugf("Removing previous plugin state %q", absPath)
if err := os.RemoveAll(absPath); err != nil {
return fmt.Errorf("failed to remove file %q: %w", absPath, err)
}
}
return nil
}
func (m *PluginManager) setInstanceID(id string) {
m.instanceIDMu.Lock()
defer m.instanceIDMu.Unlock()
m.instanceID = id
}
func (m *PluginManager) currentInstanceID() string {
m.instanceIDMu.Lock()
defer m.instanceIDMu.Unlock()
return m.instanceID
}
// InitAdHocPluginManager initializes and returns a PluginManager instance for
// ad-hoc requests that does not go through ACS. This is generally used by other
// local processes to temporarily leverage plugin manager functionality. Use
// this instead of InitPluginManager if there's no active plugin management
// required. InitAdHocPluginManager skips some initialization steps like
// scheduling plugin monitors that are not required for ad-hoc requests.
func InitAdHocPluginManager(ctx context.Context, instanceID string) (*PluginManager, error) {
galog.Infof("Initializing ad-hoc plugin manager for instance %q", instanceID)
pluginManager.setInstanceID(instanceID)
plugins, err := load(agentPluginState())
if err != nil {
return nil, fmt.Errorf("unable to load existing plugin state: %w", err)
}
pluginManager.plugins = plugins
return pluginManager, nil
}
// StopPlugin stops the plugin. This is used for ad-hoc requests that does not
// go through ACS. In case the plugin is not found or is not running, it returns
// nil error and is a no-op.
func (m *PluginManager) StopPlugin(ctx context.Context, name string) error {
galog.Infof("Stopping plugin %q", name)
plugin, err := m.fetch(name)
if err != nil {
galog.Infof("Plugin %q state not found [err: %v], skipping stop request", name, err)
return nil
}
if err := plugin.Connect(ctx); err != nil {
galog.Infof("Failed to connect to plugin %q [err: %v], skipping graceful stop", plugin.FullName(), err)
}
return m.stopAndRemovePlugin(ctx, plugin)
}
// InitPluginManager initializes and returns a PluginManager instance.
// Plugin Manager can be initialized and used to support core plugins even if
// ACS is disabled. Plugin Manager will be initialized during early Guest Agent
// startup to configure the core plugins.
func InitPluginManager(ctx context.Context, instanceID string) (*PluginManager, error) {
galog.Infof("Initializing plugin manager for instance %q", instanceID)
pluginManager.setInstanceID(instanceID)
// Cleanup old plugin state in a separate goroutine. This operation is not
// critical for plugin manager initialization and should not block it.
go func() {
if err := pluginManager.cleanupOldState(ctx, filepath.Dir(baseState())); err != nil {
galog.Errorf("Failed to cleanup old plugin state: %v", err)
}
}()
plugins, err := load(agentPluginState())
if err != nil {
return nil, fmt.Errorf("unable to load existing plugin state: %w", err)
}
pluginManager.plugins = plugins
if err := RegisterCmdHandler(ctx); err != nil {
return nil, fmt.Errorf("failed to register plugin command handler: %w", err)
}
wg := sync.WaitGroup{}
pluginManager.pendingPluginRevisionsMu.Lock()
for _, p := range plugins {
pluginManager.inProgressPluginRequests[p.Name] = true
wg.Add(1)
go func(p *Plugin) {
// Regardless of the outcome, we should remove the plugin from the pending
// list as request is no longer in process for this plugin.
defer func() {
pluginManager.pendingPluginRevisionsMu.Lock()
defer pluginManager.pendingPluginRevisionsMu.Unlock()
delete(pluginManager.inProgressPluginRequests, p.Name)
wg.Done()
}()
if err := connectOrReLaunch(ctx, p); err != nil {
galog.Errorf("Failed to connect or relaunch plugin %q: %v", p.FullName(), err)
} else {
pluginManager.startPluginSchedulers(ctx, p)
}
}(p)
}
pluginManager.IsInitialized.Store(true)
pluginManager.pendingPluginRevisionsMu.Unlock()
wg.Wait()
if isUDSSupported() {
pluginManager.protocol = udsProtocol
}
return pluginManager, nil
}
// RemoveAllDynamicPlugins filters out core plugins and triggers plugin manager
// to remove all dynamic plugins on the host. It also removes the base state
// directory to ensure that the entire state is cleaned up.
func (m *PluginManager) RemoveAllDynamicPlugins(ctx context.Context) error {
var reqs []*acpb.ConfigurePluginStates_ConfigurePlugin
galog.Infof("Removing all dynamic plugins")
var toRemove []*Plugin
for _, p := range m.list() {
if p.PluginType == PluginTypeCore {
galog.Debugf("Skipping core plugin %q, it will be removed by package manager", p.Name)
continue
}
toRemove = append(toRemove, p)
}
errChan := make(chan error, len(reqs))
wg := sync.WaitGroup{}
for _, plugin := range toRemove {
wg.Add(1)
go func(p *Plugin) {
var err error
defer wg.Done()
if err = m.StopPlugin(ctx, p.Name); err != nil {
galog.Infof("Remove plugin %q completed with error: [%v]", p.FullName(), err)
errChan <- err
}
}(plugin)
}
go func() {
wg.Wait()
baseStateDir := baseState()
if file.Exists(baseStateDir, file.TypeDir) {
if err := os.RemoveAll(baseStateDir); err != nil {
errChan <- fmt.Errorf("failed to remove file %q: %w", baseStateDir, err)
}
}
close(errChan)
}()
var errs error
for err := range errChan {
errs = errors.Join(errs, err)
}
return errs
}
// ListPluginStates returns the plugin states and cached health check
// information.
func (m *PluginManager) ListPluginStates(ctx context.Context, req *acpb.ListPluginStates) *acpb.CurrentPluginStates {
galog.Debugf("Handling list plugin state request: %+v", req)
var states []*acpb.CurrentPluginStates_DaemonPluginState
plugins := m.list()
for _, p := range plugins {
status := &acpb.CurrentPluginStates_DaemonPluginState_Status{Status: p.State()}
h := p.healthInfo()
if h != nil {
status.ResponseCode = h.responseCode
status.Results = h.messages
status.UpdateTime = tpb.New(h.timestamp)
}
p.RuntimeInfo.metricsMu.Lock()
var pluginMetrics []*acpb.CurrentPluginStates_DaemonPluginState_Metric
for _, metric := range p.RuntimeInfo.metrics.All() {
monitorMetric := &acpb.CurrentPluginStates_DaemonPluginState_Metric{
Timestamp: metric.timestamp,
CpuUsage: metric.cpuUsage,
MemoryUsage: metric.memoryUsage,
}
pluginMetrics = append(pluginMetrics, monitorMetric)
}
state := &acpb.CurrentPluginStates_DaemonPluginState{
Name: p.Name,
CurrentRevisionId: p.Revision,
CurrentPluginStatus: status,
CurrentPluginMetrics: pluginMetrics,
}
pendingStatus := p.pendingStatus()
if pendingStatus != nil {
state.PendingRevisionId = pendingStatus.revision
}
// Flush the metrics array.
p.RuntimeInfo.metrics.Reset()
// Release the metrics lock.
p.RuntimeInfo.metricsMu.Unlock()
// Append the state to the list.
states = append(states, state)
}
return &acpb.CurrentPluginStates{DaemonPluginStates: states}
}
// ConfigurePluginStates configures the plugin states as stated in the request.
// localPlugin identifies if the plugin is a core plugin. These core plugins are
// installed by package managers but not launched along with Guest Agent binary.
// Plugin Manager will launch and manage lifecycle of core plugins along with
// other dynamic plugins.
func (m *PluginManager) ConfigurePluginStates(ctx context.Context, req *acpb.ConfigurePluginStates, localPlugin bool) {
galog.Debugf("Handling configure plugin state request: %+v, local plugin: %t", req, localPlugin)
wg := sync.WaitGroup{}
var toProcess []*acpb.ConfigurePluginStates_ConfigurePlugin
m.pendingPluginRevisionsMu.Lock()
for _, req := range req.GetConfigurePlugins() {
_, inProgress := m.inProgressPluginRequests[req.GetPlugin().GetName()]
if inProgress {
// There's already a request in progress for this plugin, ignore this
// request.
galog.Infof("Ignoring request to %v plugin %q, another request already in progress", req.GetAction(), req.GetPlugin().GetName())
continue
}
toProcess = append(toProcess, req)
m.inProgressPluginRequests[req.GetPlugin().GetName()] = true
}
m.pendingPluginRevisionsMu.Unlock()
for _, req := range toProcess {
wg.Add(1)
go func(req *acpb.ConfigurePluginStates_ConfigurePlugin) {
defer wg.Done()
// Regardless of the outcome, we should remove the plugin from the pending
// list as request is no longer in process for this plugin.
defer func() {
m.pendingPluginRevisionsMu.Lock()
defer m.pendingPluginRevisionsMu.Unlock()
delete(m.inProgressPluginRequests, req.GetPlugin().GetName())
}()
m.configurePlugin(ctx, req, localPlugin)
}(req)
}
wg.Wait()
galog.Debugf("Configure plugin state request completed")
}
// list returns the list of currently managed plugins.
func (m *PluginManager) list() []*Plugin {
m.mu.RLock()
defer m.mu.RUnlock()
var plugins []*Plugin
for _, p := range m.plugins {
plugins = append(plugins, p)
}
return plugins
}
// fetch returns the plugin instance with the given name.
func (m *PluginManager) fetch(name string) (*Plugin, error) {
m.mu.RLock()
defer m.mu.RUnlock()
p, ok := m.plugins[name]
if !ok {
return nil, fmt.Errorf("plugin %q not found", name)
}
return p, nil
}
// add stores the plugin instance with the given name.
func (m *PluginManager) add(p *Plugin) {
m.mu.Lock()
defer m.mu.Unlock()
m.plugins[p.Name] = p
}
// delete deletes the plugin instance with the given name.
func (m *PluginManager) delete(name string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.plugins, name)
}
func (m *PluginManager) configurePlugin(ctx context.Context, req *acpb.ConfigurePluginStates_ConfigurePlugin, localPlugin bool) {
var success bool
switch req.GetAction() {
case acpb.ConfigurePluginStates_INSTALL:
if err := m.installPlugin(ctx, req, localPlugin); err != nil {
galog.Errorf("Failed to install plugin %q, revision %q: %v", req.GetPlugin().GetName(), req.GetPlugin().GetRevisionId(), err)
} else {
success = true
}
case acpb.ConfigurePluginStates_REMOVE:
if err := m.removePlugin(ctx, req); err != nil {
galog.Errorf("Failed to remove plugin %q, revision %q: %v", req.GetPlugin().GetName(), req.GetPlugin().GetRevisionId(), err)
} else {
success = true
}
default:
galog.Warnf("Unknown action (%s) for configure plugin state request, ignoring", req.GetAction().String())
}
m.requestCountMu.Lock()
defer m.requestCountMu.Unlock()
if m.requestCount[req.GetAction()] == nil {
m.requestCount[req.GetAction()] = make(map[bool]int)
}
m.requestCount[req.GetAction()][success]++
}
// setMetricConfig sets the default metric configurations for the plugin or
// overrides it from request proto if provided.
func (p *Plugin) setMetricConfig(req *acpb.ConfigurePluginStates_ConfigurePlugin) {
p.Manifest.MetricsInterval = metricsCheckFrequency
p.Manifest.MaxMetricDatapoints = maxMetricDatapoints
if points := req.GetManifest().GetMaxMetricDatapoints(); points != 0 {
p.Manifest.MaxMetricDatapoints = uint(points)
}
if interval := req.GetManifest().GetMetricsInterval().GetSeconds(); interval != 0 {
p.Manifest.MetricsInterval = time.Duration(interval) * time.Second
}
p.RuntimeInfo.metrics = boundedlist.New[Metric](p.Manifest.MaxMetricDatapoints)
}
// newPluginManifest generates agent representation of the manifest from the
// install request.
func newPluginManifest(req *acpb.ConfigurePluginStates_ConfigurePlugin) (*Manifest, error) {
manifest := &Manifest{
StartAttempts: int(req.GetManifest().GetStartAttemptCount()),
MaxMemoryUsage: req.GetManifest().GetMaxMemoryUsageBytes(),
MaxCPUUsage: req.GetManifest().GetMaxCpuUsagePercentage(),
StopTimeout: time.Duration(req.GetManifest().GetStopTimeout().GetSeconds()) * time.Second,
StartTimeout: time.Duration(req.GetManifest().GetStartTimeout().GetSeconds()) * time.Second,
StartConfig: &ServiceConfig{},
}
if req.GetManifest().Config == nil {
return manifest, nil
}
switch req.GetManifest().Config.(type) {
case *acpb.ConfigurePluginStates_Manifest_StringConfig:
manifest.StartConfig.Simple = req.GetManifest().GetStringConfig()
case *acpb.ConfigurePluginStates_Manifest_StructConfig:
// Marshal the service config to a byte array to persist it on disk. This
// will be un-marshaled to use at plugin launch time.
bytes, err := proto.Marshal(req.GetManifest().GetStructConfig())
if err != nil {
return nil, fmt.Errorf("unable to marshal service config: %w", err)
}
manifest.StartConfig.Structured = bytes
}
return manifest, nil
}
// newPlugin creates a new plugin instance from the request.
// Rest of the plugin instance values are set at run time when install
// steps are executed on it.
func newPlugin(req *acpb.ConfigurePluginStates_ConfigurePlugin, localPlugin bool) (*Plugin, error) {
p := &Plugin{
Name: req.GetPlugin().GetName(),
PluginType: PluginTypeDynamic,
Revision: req.GetPlugin().GetRevisionId(),
RuntimeInfo: &RuntimeInfo{},
}
manifest, err := newPluginManifest(req)
if err != nil {
return nil, fmt.Errorf("unable to generate plugin service config: %w", err)
}
p.Manifest = manifest
if localPlugin {
// Dynamic plugins are installed in a specific directory, that install
// workflow sets its install path. In case of local plugins since they're
// already present on disk and directory is known set its install path here
// itself.
p.InstallPath = filepath.Dir(req.GetPlugin().GetEntryPoint())
// Only core plugins can be present on disk before Plugin Manager installs.
p.PluginType = PluginTypeCore
}
p.setMetricConfig(req)
return p, nil
}
// installPlugin installs checks if the plugin already exists and does a
// fresh install or removes existing plugin revision and installs a new one.
func (m *PluginManager) installPlugin(ctx context.Context, req *acpb.ConfigurePluginStates_ConfigurePlugin, localPlugin bool) error {
galog.Infof("Installing plugin %q, revision %q", req.GetPlugin().GetName(), req.GetPlugin().GetRevisionId())
plugin, err := newPlugin(req, localPlugin)
if err != nil {
return fmt.Errorf("failed to create new plugin instance: %w", err)
}
sendEvent(ctx, plugin, acpb.PluginEventMessage_PLUGIN_CONFIG_INSTALL, "Received request to install a plugin.")
currPlugin, err := m.fetch(req.GetPlugin().GetName())
if err == nil && currPlugin.Revision == req.GetPlugin().GetRevisionId() {
sendEvent(ctx, currPlugin, acpb.PluginEventMessage_PLUGIN_INSTALL_FAILED, "Plugin is already installed or being processed.")
return fmt.Errorf("plugin %q is already installed or being processed", currPlugin.FullName())
}
if currPlugin != nil {
return m.upgradePlugin(ctx, req, localPlugin)
}
steps := m.generateInstallWorkflow(ctx, req, localPlugin)
return m.runlaunchPluginSteps(ctx, plugin, steps)
}
// startPluginSchedulers starts all scheduler jobs for the plugin.
func (m *PluginManager) startPluginSchedulers(ctx context.Context, plugin *Plugin) {
// At this point plugin is already running, run them in a separate go routine
// to avoid blocking the main thread. These jobs are configured to start
// immediately scheduling them synchronously would block the caller until they
// finish first run.
go func() {
m.startMonitoring(ctx, plugin)
m.startMetricsMonitoring(ctx, plugin)
}()
}
// runlaunchPluginSteps runs the steps to launch the plugin. Steps differ for
// install and upgrade but the reaction to success/failure is the same.
func (m *PluginManager) runlaunchPluginSteps(ctx context.Context, plugin *Plugin, steps []Step) error {
// Store the plugin in the manager as soon as we start processing so
// [ListPluginStates] can send intermediate states as well.
m.add(plugin)
if err := plugin.runSteps(ctx, steps); err != nil {
// Delete the plugin from the list in case it fails, agent will start fresh
// if ACS requests to install the plugin again.
m.delete(plugin.Name)
sendEvent(ctx, plugin, acpb.PluginEventMessage_PLUGIN_INSTALL_FAILED, fmt.Sprintf("Failed to install plugin: %v", err))
// If the installation fails, try cleaning up the process to prevent
// potential conflicts or errors that unmanaged processes could cause.
if err := ps.KillProcess(plugin.pid(), ps.KillModeNoWait); err != nil {
// Just log the error, process might have crashed, already exited or not
// successfully launched at all.
galog.Warnf("Stop plugin %q finished with error: %v", plugin.FullName(), err)
}
// Return original install error.
return fmt.Errorf("install plugin %q: %w", plugin.FullName(), err)
}
m.startPluginSchedulers(ctx, plugin)
sendEvent(ctx, plugin, acpb.PluginEventMessage_PLUGIN_INSTALLED, "Successfully installed the plugin.")
galog.Infof("Successfully installed plugin %q", plugin.FullName())
return nil
}
// upgradePlugin handles the plugin revision upgrades. It downloads and unpacks
// the new plugin revision, stops the old plugin revision and then launches the
// new one.
func (m *PluginManager) upgradePlugin(ctx context.Context, req *acpb.ConfigurePluginStates_ConfigurePlugin, localPlugin bool) error {
plugin, err := newPlugin(req, localPlugin)
if err != nil {
return fmt.Errorf("failed to create new plugin instance: %w", err)
}
currPlugin, err := m.fetch(req.GetPlugin().GetName())
if err != nil {
return fmt.Errorf("fetching current instance for %q: %w", req.GetPlugin().GetName(), err)
}
// If new revision fails pre-launch steps, current plugin would keep running
// and new revision process would be aborted. If new revision succeeds,
// current plugin would be stopped and removed. Reset pending plugin status to
// avoid showing plugin install in progress.
defer currPlugin.resetPendingStatus()
// Current plugin will be removed as soon as new plugin launch is started
// below. Set the pending status to show new plugin revision install in
// progress. This will be captured by [ListPluginStates] and sent to ACS.
currPlugin.setPendingStatus(plugin.Revision, acpb.CurrentPluginStates_DaemonPluginState_INSTALLING)
// Two plugin revisions can co-exist on the same host, but only one of them
// can be running. Run pre-launch steps on new plugin revision to reduce
// plugin downtime and make sure it can be launched.
steps := m.preLaunchWorkflow(ctx, req, localPlugin)
galog.Infof("Running pre-upgrade steps for plugin %q", plugin.FullName())
if err := plugin.runSteps(ctx, steps); err != nil {
sendEvent(ctx, plugin, acpb.PluginEventMessage_PLUGIN_INSTALL_FAILED, fmt.Sprintf("Failed to run pre-upgrade steps: %v", err))
return fmt.Errorf("failed to run pre-upgrade steps: %w", err)
}
// Previously installed plugin revision already exists, remove before
// installing a new one.
galog.Infof("Stopping and removing old plugin %q", currPlugin.FullName())
if err := m.stopAndRemovePlugin(ctx, currPlugin); err != nil {
sendEvent(ctx, currPlugin, acpb.PluginEventMessage_PLUGIN_INSTALL_FAILED, fmt.Sprintf("Failed to remove plugin: %v", err))
return fmt.Errorf("failed to remove plugin: %w", err)
}
return m.runlaunchPluginSteps(ctx, plugin, []Step{m.newLaunchStep(req, localPlugin)})
}
// stopAndRemovePlugin stops the given plugin, all of its schedulers and removes
// it from the manager.
func (m *PluginManager) stopAndRemovePlugin(ctx context.Context, p *Plugin) error {
sendEvent(ctx, p, acpb.PluginEventMessage_PLUGIN_CONFIG_REMOVE, "Received request to remove a plugin.")
// Stop all schedulers running on the plugin so it doesn't interfere with the
// stop process.
m.stopMonitoring(p)
m.stopMetricsMonitoring(p)
if err := p.runSteps(ctx, []Step{&stopStep{cleanup: true}}); err != nil {
sendEvent(ctx, p, acpb.PluginEventMessage_PLUGIN_REMOVE_FAILED, fmt.Sprintf("Failed to remove plugin: %v", err))
return fmt.Errorf("unable to remove plugin %q: %w", p.FullName(), err)
}
sendEvent(ctx, p, acpb.PluginEventMessage_PLUGIN_REMOVED, "Successfully removed the plugin.")
m.delete(p.Name)
galog.Infof("Successfully removed plugin %q", p.FullName())
return nil
}
// removePlugin removes the plugin revision or ignores the request if plugin
// does not exist.
func (m *PluginManager) removePlugin(ctx context.Context, req *acpb.ConfigurePluginStates_ConfigurePlugin) error {
galog.Infof("Removing plugin %q, revision %s", req.GetPlugin().GetName(), req.GetPlugin().GetRevisionId())
p, err := m.fetch(req.GetPlugin().GetName())
if err != nil {
sendEvent(ctx, &Plugin{Name: req.GetPlugin().GetName(), Revision: req.GetPlugin().GetRevisionId()}, acpb.PluginEventMessage_PLUGIN_REMOVE_FAILED, "Plugin not found.")
return fmt.Errorf("plugin %q not found", req.GetPlugin().GetName())
}
if err := m.stopAndRemovePlugin(ctx, p); err != nil {
return fmt.Errorf("failed to remove plugin %q: %w", p.FullName(), err)
}
// State directory should persist across plugin revisions and should be
// removed only when plugin is explicitly removed.
stateDir := p.stateDir()
if !file.Exists(stateDir, file.TypeDir) {
galog.Debugf("Plugin state directory %q does not exist, nothing to remove", stateDir)
return nil
}
return os.RemoveAll(p.stateDir())
}
// startMonitoring schedules a plugin monitoring job that ensures the
// plugin is running. If plugin is found unhealthy it is restarted.
func (m *PluginManager) startMonitoring(ctx context.Context, p *Plugin) {
galog.Infof("Starting plugin monitor job for plugin %q", p.FullName())
pm := NewPluginMonitor(p, healthCheckFrequency)
m.pluginMonitorMu.Lock()
m.pluginMonitors[p.FullName()] = pm.ID()
m.pluginMonitorMu.Unlock()
// ScheduleJob() throws error only if we try to schedule a job that should
// not be enabled (ShouldEnable() returns false). Ignoring this error
// here as in case of monitoring pm.ShouldEnable() always returns true.
m.scheduler.ScheduleJob(ctx, pm)
}
// stopMonitoring stops/removes the plugin monitoring job.
func (m *PluginManager) stopMonitoring(p *Plugin) {
galog.Infof("Removing plugin monitor job for plugin %q", p.FullName())
pm, ok := m.pluginMonitors[p.FullName()]
if !ok {
galog.Warnf("Plugin monitor not found for %q, ignoring stop monitor request", p.FullName())
return
}
m.scheduler.UnscheduleJob(pm)
m.pluginMonitorMu.Lock()
defer m.pluginMonitorMu.Unlock()
delete(m.pluginMonitors, p.FullName())
}
// startMetricsMonitoring schedules a plugin resource metrics collection job
// that collects and stores them in memory.
func (m *PluginManager) startMetricsMonitoring(ctx context.Context, p *Plugin) {
galog.Infof("Starting plugin metrics monitor job for plugin %q", p.FullName())
pm := NewPluginMetrics(p, p.Manifest.MetricsInterval)
// ScheduleJob() throws error only if we try to schedule a job that should
// not be enabled (ShouldEnable() returns false). Ignoring this error
// here as in case of monitoring pm.ShouldEnable() always returns true.
m.scheduler.ScheduleJob(ctx, pm)
m.pluginMetricsMu.Lock()
defer m.pluginMetricsMu.Unlock()
m.pluginMetricsMonitors[p.FullName()] = pm.ID()
}
// stopMetricsMonitoring stops/removes the plugin metrics monitoring job.
func (m *PluginManager) stopMetricsMonitoring(p *Plugin) {
galog.Infof("Removing plugin metrics monitor job for plugin %q", p.FullName())
m.pluginMetricsMu.Lock()
pm, ok := m.pluginMetricsMonitors[p.FullName()]
m.pluginMetricsMu.Unlock()
if !ok {
galog.Warnf("Plugin metrics monitor not found for %q, ignoring stop monitor request", p.FullName())
return
}
m.scheduler.UnscheduleJob(pm)
m.pluginMetricsMu.Lock()
defer m.pluginMetricsMu.Unlock()
delete(m.pluginMetricsMonitors, p.FullName())
}
// connectOrReLaunch connects to the plugin and launches the plugin if needed.
func connectOrReLaunch(ctx context.Context, p *Plugin) error {
if p.IsRunning(ctx) {
p.setState(acpb.CurrentPluginStates_DaemonPluginState_RUNNING)
return nil
}
galog.Debugf("Plugin %q is not running, relaunching", p.FullName())
if err := p.runSteps(ctx, relaunchWorkflow(ctx, p)); err != nil {
p.setState(acpb.CurrentPluginStates_DaemonPluginState_CRASHED)
return fmt.Errorf("failed to relaunch plugin %q: %w", p.FullName(), err)
}
return nil
}
// load loads the plugin information from directory and returns a map of
// plugins.
func load(stateDir string) (map[string]*Plugin, error) {
galog.Debugf("Loading plugin state from %s", stateDir)
plugins := make(map[string]*Plugin)
files, err := os.ReadDir(stateDir)
if err != nil {
if os.IsNotExist(err) {
// Plugin state might not exist yet to load from disk just log.
galog.Debugf("Plugin state directory %q does not exist, nothing to load", stateDir)
return plugins, nil
}
return nil, fmt.Errorf("unable to load plugin state from directory %s: %v", stateDir, err)
}
for _, f := range files {
if f.IsDir() {
galog.Debugf("Found unknown directory %q in %q, ignoring", f.Name(), stateDir)
continue
}
file := filepath.Join(stateDir, f.Name())
fh, err := os.Open(file)
if err != nil {
return nil, fmt.Errorf("unabled to read plugin state from %s: %w", file, err)
}
defer fh.Close()
plugin := &Plugin{}
if err := gob.NewDecoder(fh).Decode(plugin); err != nil {
return nil, fmt.Errorf("unable to decode plugin state file %s: %w", f, err)
}
plugin.RuntimeInfo.metrics = boundedlist.New[Metric](plugin.Manifest.MaxMetricDatapoints)
plugins[plugin.Name] = plugin
}
return plugins, nil
}
// sendEvent sends a plugin event on ACS channel.
func sendEvent(ctx context.Context, p *Plugin, evType acpb.PluginEventMessage_PluginEventType, details string) {
event := &acpb.PluginEventMessage{
PluginName: p.Name,
RevisionId: p.Revision,
EventType: evType,
EventTimestamp: tpb.New(time.Now()),
EventDetails: []byte(details),
}
// This might do a retry on the client side if it fails no point in blocking
// the caller.
go func() {
if err := client.Notify(ctx, event); err != nil {
// Just log the error, Notify() internally handles retrying the request
// if this fails there's nothing really we can do.
galog.Errorf("Failed to sent event notification [%+v]: %v", event, err)
}
}()
}