internal/plugin/manager/pluginengine.go (295 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package manager contains the plugin manager engine that handles the workflow.
package manager
import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"sync"
"time"
"github.com/GoogleCloudPlatform/galog"
acmpb "github.com/GoogleCloudPlatform/google-guest-agent/internal/acp/proto/google_guest_agent/acp"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/boundedlist"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/cfg"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/retry"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/utils/file"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
structpb "google.golang.org/protobuf/types/known/structpb"
)
// PluginType is the type of plugin.
type PluginType int
const (
// PluginTypeCore represents plugin is a core plugin. These are generally
// packaged with the Guest Agent and installed by package mangers and offer
// core Guest Agent functionality and are required to support various GCE
// features.
PluginTypeCore PluginType = iota
// PluginTypeDynamic represents plugin is a dynamic plugin. These type of
// plugins are optional plugins that are dynamically downloaded and installed
// by the Guest Agent.
PluginTypeDynamic
// pluginInstallDir is the directory under the agent base state directory
// where all plugins are installed.
pluginInstallDir = "plugins"
// pluginStateDir is the directory under the agent base state directory where
// all plugins store their state and is persisted across revisions.
pluginStateDir = "plugin_state"
)
// Plugin struct represents the plugin information.
type Plugin struct {
// PluginType identifies if the plugin type.
PluginType PluginType
// Name is the current plugin name.
Name string
// Revision is the current plugin revision.
Revision string
// Address is the current address plugin is listening on.
Address string
// InstallPath is the path to the directory where plugin is
// installed/unpacked.
InstallPath string
// EntryPath is the path to the plugin/binary entry point from which its spun
// up.
EntryPath string
// Protocol is the protocol used for communication with the plugin.
Protocol string
// client is grpc client connection with the plugin.
client *grpc.ClientConn
// Manifest is plugin configuration defining various agent/plugin behavior.
Manifest *Manifest
// RuntimeInfo holds plugin runtime information.
RuntimeInfo *RuntimeInfo
}
// RuntimeInfo represent plugin metrics and health check information captured at
// run time. Expect info here to change during plugin execution.
type RuntimeInfo struct {
// statusMu mutex protects concurrent updates to plugin status.
statusMu sync.RWMutex
// status is the current plugin status.
status acmpb.CurrentPluginStates_DaemonPluginState_StatusValue
// healthMu mutex protects plugin health check information.
healthMu sync.Mutex
// health is the current plugin health check information.
health *healthCheck
// metricsMu is a mutex that protects the metrics field.
metricsMu sync.Mutex
// metrics is a list of metrics reported by the plugin.
metrics *boundedlist.List[Metric]
// pidMu mutex protects plugin pid.
pidMu sync.RWMutex
// Pid is the process id of the plugin.
Pid int
// pendingStatusMu mutex protects pendingPluginStatus.
pendingStatusMu sync.Mutex
// pendingPluginStatus is the status of pending plugin revision. This is the
// new revision that is being installed.
pendingPluginStatus *pendingPluginStatus
}
// Manifest is the plugin specific static config agent received from ACP.
type Manifest struct {
// StartAttempts is the number of times to try launching the plugin.
StartAttempts int
// MaxMemoryUsage is the maximum allowed memory usage of the plugin, in bytes.
MaxMemoryUsage int64
// MaxCPUUsage is the maximum allowed percent CPU usage of the plugin.
MaxCPUUsage int32
// MaxMetricDatapoints is the maximum number of datapoints to report/collect.
// Metrics are collected every [MetricsInterval] but are flushed from memory
// only when reported back to the service. This count limits datapoints from
// growing indefinitely.
MaxMetricDatapoints uint
// MetricsInterval is the interval at which metrics are collected.
MetricsInterval time.Duration
// StopTimeout is the timeout set on plugin stop request before process is
// killed.
StopTimeout time.Duration
// StartTimeout is the timeout set on plugin start request.
StartTimeout time.Duration
// StartConfig is the config service has sent down for passing down to the
// plugin on each start RPC request.
StartConfig *ServiceConfig
// LaunchArguments are extra arguments specified by plugin owners to pass down
// during process launch.
LaunchArguments []string
}
// ServiceConfig is agent agnostic data that is passed to the plugin on every
// start rpc request. At any given time only one of this can be set.
type ServiceConfig struct {
// Simple is simple string form of the config.
Simple string
// Structured is structured [*structpb.Struct] config message. It is marshaled
// to a byte array to persist across agent restarts and reuse on every plugin
// start request. Agent will unmarshal using [toProto] method before sending
// it to plugins on Start RPC.
Structured []byte
}
// toProto unmarshals bytes and returns struct proto message representation.
func (c *ServiceConfig) toProto() (*structpb.Struct, error) {
cfg := &structpb.Struct{}
if err := proto.Unmarshal(c.Structured, cfg); err != nil {
return nil, fmt.Errorf("unable to unmarshal struct config bytes: %w", err)
}
return cfg, nil
}
func (p *Plugin) resetPendingStatus() {
p.RuntimeInfo.pendingStatusMu.Lock()
defer p.RuntimeInfo.pendingStatusMu.Unlock()
p.RuntimeInfo.pendingPluginStatus = nil
}
func (p *Plugin) setPendingStatus(revision string, status acmpb.CurrentPluginStates_DaemonPluginState_StatusValue) {
p.RuntimeInfo.pendingStatusMu.Lock()
defer p.RuntimeInfo.pendingStatusMu.Unlock()
p.RuntimeInfo.pendingPluginStatus = &pendingPluginStatus{
revision: revision,
status: status,
}
}
func (p *Plugin) pendingStatus() *pendingPluginStatus {
p.RuntimeInfo.pendingStatusMu.Lock()
defer p.RuntimeInfo.pendingStatusMu.Unlock()
return p.RuntimeInfo.pendingPluginStatus
}
// pendingPluginStatus struct represents the pending plugin status. This is
// set only when a plugin revision is being changed.
type pendingPluginStatus struct {
// revision is the pending plugin revision.
revision string
// status is the pending plugin status.
status acmpb.CurrentPluginStates_DaemonPluginState_StatusValue
}
// healthCheck struct represents the health check information.
type healthCheck struct {
// responseCode is the response code returned by plugin during health check.
responseCode int32
// messages is the list of messages returned by plugin during health check.
// This could include potential error reasons or any info plugins might want
// to report to the service.
messages []string
// timestamp is the timestamp at which the health check was executed.
timestamp time.Time
}
// healthInfo returns the current cached plugin health check information.
func (p *Plugin) healthInfo() *healthCheck {
p.RuntimeInfo.healthMu.Lock()
defer p.RuntimeInfo.healthMu.Unlock()
return p.RuntimeInfo.health
}
// setHealthInfo sets the plugin health check information.
func (p *Plugin) setHealthInfo(h *healthCheck) {
p.RuntimeInfo.healthMu.Lock()
defer p.RuntimeInfo.healthMu.Unlock()
p.RuntimeInfo.health = h
}
// FullName returns the full name of the plugin including name and revision.
func (p *Plugin) FullName() string {
return fmt.Sprintf("%s_%s", p.Name, p.Revision)
}
// setPid sets the current plugin process id.
func (p *Plugin) setPid(pid int) {
p.RuntimeInfo.pidMu.Lock()
defer p.RuntimeInfo.pidMu.Unlock()
p.RuntimeInfo.Pid = pid
}
// pid returns the current plugin process id.
func (p *Plugin) pid() int {
p.RuntimeInfo.pidMu.RLock()
defer p.RuntimeInfo.pidMu.RUnlock()
return p.RuntimeInfo.Pid
}
// setState sets the plugin status.
func (p *Plugin) setState(s acmpb.CurrentPluginStates_DaemonPluginState_StatusValue) {
p.RuntimeInfo.statusMu.Lock()
defer p.RuntimeInfo.statusMu.Unlock()
p.RuntimeInfo.status = s
}
// State returns the plugin status.
func (p *Plugin) State() acmpb.CurrentPluginStates_DaemonPluginState_StatusValue {
p.RuntimeInfo.statusMu.RLock()
defer p.RuntimeInfo.statusMu.RUnlock()
return p.RuntimeInfo.status
}
// runSteps runs the steps in the order they are given.
func (p *Plugin) runSteps(ctx context.Context, steps []Step) error {
for _, step := range steps {
galog.Debugf("Running %q on plugin %q", step.Name(), p.FullName())
p.setState(step.Status())
if err := ctx.Err(); err != nil {
return fmt.Errorf("%q failed, context error: %w", step.Name(), err)
}
if err := step.Run(ctx, p); err != nil {
p.setState(step.ErrorStatus())
return fmt.Errorf("%q failed with error: %w", step.Name(), err)
}
}
return nil
}
// baseState returns the base path where the agent and all plugins state is
// stored.
func baseState() string {
return filepath.Join(filepath.Clean(cfg.Retrieve().Plugin.StateDir), pluginManager.currentInstanceID())
}
// pluginInstallPath returns the path where the plugin is installed. This is
// the path where the plugin is unpacked and the entry point is executed from.
func pluginInstallPath(name, revision string) string {
return filepath.Join(baseState(), pluginInstallDir, fmt.Sprintf("%s_%s", name, revision))
}
// Step represents an interface for each step run as part of a plugin
// configuration.
type Step interface {
// The name of the step.
Name() string
// Status returns the plugin state for current step.
Status() acmpb.CurrentPluginStates_DaemonPluginState_StatusValue
// ErrorStatus returns the plugin state if current step fails.
ErrorStatus() acmpb.CurrentPluginStates_DaemonPluginState_StatusValue
// Performs the step.
Run(context.Context, *Plugin) error
}
// downloadStep represents the download step of plugin install.
type downloadStep struct {
// url is the GCS signed URL of the plugin package.
url string
// targetPath is the path to the target file where the package is downloaded.
targetPath string
// checksum is the expected sha256sum of the downloaded package.
checksum string
// attempts is the number of times to try downloading the package in case of
// failure.
attempts int
// timeout is the timeout for the download.
timeout time.Duration
}
// Name returns the name of the step.
func (d *downloadStep) Name() string { return "DownloadPluginStep" }
// Status returns the plugin state for current step.
func (d *downloadStep) Status() acmpb.CurrentPluginStates_DaemonPluginState_StatusValue {
return acmpb.CurrentPluginStates_DaemonPluginState_INSTALLING
}
// ErrorStatus returns the plugin state if download step fails.
func (d *downloadStep) ErrorStatus() acmpb.CurrentPluginStates_DaemonPluginState_StatusValue {
return acmpb.CurrentPluginStates_DaemonPluginState_INSTALL_FAILED
}
// Run downloads a package from GCS and validates the checksum.
func (d *downloadStep) Run(ctx context.Context, _ *Plugin) error {
// Try downloading package d.attempts times with 2 second interval.
policy := retry.Policy{MaxAttempts: d.attempts, BackoffFactor: 1, Jitter: time.Second * 2}
return retry.Run(ctx, policy, func() error {
client := http.Client{
Timeout: d.timeout,
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, d.url, nil)
if err != nil {
return fmt.Errorf("failed to create request for %q: %w", d.url, err)
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to download from %q: %w", d.url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to download from %q, bad status: %q", d.url, resp.Status)
}
dir := filepath.Dir(d.targetPath)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create directory %q: %w", dir, err)
}
f, err := os.Create(d.targetPath)
if err != nil {
return fmt.Errorf("unable to create file %q: %w", d.targetPath, err)
}
defer f.Close()
if _, err = io.Copy(f, resp.Body); err != nil {
return fmt.Errorf("unable to copy response body to file %q: %w", f.Name(), err)
}
calculated, err := file.SHA256FileSum(f.Name())
if err != nil {
return fmt.Errorf("failed to calculate sha256sum of file %q: %w", f.Name(), err)
}
if calculated != d.checksum {
return fmt.Errorf("file %q has different sha256sum: %q != %q", f.Name(), calculated, d.checksum)
}
galog.Debugf("Successfully downloaded %q to %q", d.url, d.targetPath)
return nil
})
}
// unpackStep represents the unpack step of plugin install.
type unpackStep struct {
// archivePath is the path to the archive.
archivePath string
// targetDir is the path to the target directory where archive is unpacked.
targetDir string
}
// Name returns the name of the step.
func (u *unpackStep) Name() string { return "UnpackPluginArchiveStep" }
// Status returns the plugin state for current step.
func (u *unpackStep) Status() acmpb.CurrentPluginStates_DaemonPluginState_StatusValue {
return acmpb.CurrentPluginStates_DaemonPluginState_INSTALLING
}
// ErrorStatus returns the plugin state if unpack step fails.
func (u *unpackStep) ErrorStatus() acmpb.CurrentPluginStates_DaemonPluginState_StatusValue {
return acmpb.CurrentPluginStates_DaemonPluginState_INSTALL_FAILED
}
// Run unpacks to the target directory and deletes the archive file.
func (u *unpackStep) Run(ctx context.Context, p *Plugin) error {
// Make sure we unpack in to a clean directory. No previous unpack state
// is retained on install.
if err := os.RemoveAll(u.targetDir); err != nil {
return fmt.Errorf("failed to cleanup %q: %w", u.targetDir, err)
}
if err := file.UnpackTargzFile(u.archivePath, u.targetDir); err != nil {
return fmt.Errorf("failed to unpack %q to directory %q: %w", u.archivePath, u.targetDir, err)
}
// Cleanup archive file on successful decompression.
if err := os.RemoveAll(u.archivePath); err != nil {
return fmt.Errorf("failed to remove %q: %w", u.archivePath, err)
}
p.InstallPath = u.targetDir
galog.Debugf("Successfully unpacked %q to %q", u.archivePath, u.targetDir)
return nil
}
// generateInstallWorkflow generates the workflow for a plugin installation.
func (m *PluginManager) generateInstallWorkflow(ctx context.Context, req *acmpb.ConfigurePluginStates_ConfigurePlugin, localPlugin bool) []Step {
var steps []Step
if req.GetAction() != acmpb.ConfigurePluginStates_INSTALL {
return steps
}
steps = append(steps, m.preLaunchWorkflow(ctx, req, localPlugin)...)
steps = append(steps, m.newLaunchStep(req, localPlugin))
return steps
}
func (m *PluginManager) newLaunchStep(req *acmpb.ConfigurePluginStates_ConfigurePlugin, localPlugin bool) Step {
state := pluginInstallPath(req.GetPlugin().GetName(), req.GetPlugin().GetRevisionId())
l := &launchStep{
entryPath: filepath.Join(state, req.GetPlugin().GetEntryPoint()),
maxMemoryUsage: req.GetManifest().GetMaxMemoryUsageBytes(),
maxCPUUsage: req.GetManifest().GetMaxCpuUsagePercentage(),
startAttempts: int(req.GetManifest().GetStartAttemptCount()),
protocol: m.protocol,
extraArgs: req.GetPlugin().GetArguments(),
}
if localPlugin {
// Since plugin is already present on disk entry point is not prepended with
// state directory (install path as its done for dynamic plugins).
l.entryPath = req.GetPlugin().GetEntryPoint()
}
return l
}
// relaunchWorkflow generates the workflow for a re-launching a plugin.
func relaunchWorkflow(ctx context.Context, p *Plugin) []Step {
// Run stop to make sure plugin process is not alive.
// Relaunch means we're not removing plugin, always set cleanup to false.
s := &stopStep{cleanup: false}
l := &launchStep{
entryPath: p.EntryPath,
maxMemoryUsage: p.Manifest.MaxMemoryUsage,
maxCPUUsage: p.Manifest.MaxCPUUsage,
startAttempts: p.Manifest.StartAttempts,
protocol: p.Protocol,
extraArgs: p.Manifest.LaunchArguments,
}
return []Step{s, l}
}
// preLaunchWorkflow generates the workflow to run before attempting any action
// on a plugin revision.
func (m *PluginManager) preLaunchWorkflow(ctx context.Context, req *acmpb.ConfigurePluginStates_ConfigurePlugin, localPlugin bool) []Step {
// Plugins that are already on disk (generally installed by package manager)
// don't need download/unpack step, simply launch them.
if localPlugin {
return nil
}
statePath := baseState()
state := pluginInstallPath(req.GetPlugin().GetName(), req.GetPlugin().GetRevisionId())
archivePath := filepath.Join(statePath, req.GetPlugin().GetName()+".tar.gz")
d := &downloadStep{
url: req.GetPlugin().GetGcsSignedUrl(),
targetPath: archivePath,
timeout: time.Duration(req.GetManifest().GetDownloadTimeout().GetSeconds()) * time.Second,
attempts: int(req.GetManifest().GetDownloadAttemptCount()),
checksum: req.GetPlugin().GetChecksum(),
}
u := &unpackStep{
archivePath: archivePath,
targetDir: state,
}
return []Step{d, u}
}