internal/plugin/manager/pluginlauncher.go (181 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package manager import ( "context" "fmt" "net" "os" "path/filepath" "runtime" "strings" "time" "github.com/GoogleCloudPlatform/galog" "github.com/GoogleCloudPlatform/google-guest-agent/internal/cfg" "github.com/GoogleCloudPlatform/google-guest-agent/internal/resource" "github.com/GoogleCloudPlatform/google-guest-agent/internal/retry" "github.com/GoogleCloudPlatform/google-guest-agent/internal/run" "github.com/GoogleCloudPlatform/google-guest-agent/internal/utils/file" acmpb "github.com/GoogleCloudPlatform/google-guest-agent/internal/acp/proto/google_guest_agent/acp" ) const ( // udsProtocol is UDS protocol name to use for gRPC communication. udsProtocol = "unix" // tcpProtocol is TCP protocol name to use for gRPC communication. tcpProtocol = "tcp" ) // launchStep implements the plugin launch. type launchStep struct { // entryPath is the path to binary to launch plugin. entryPath string // maxMemoryUsage is the allowed maximum memory usage, in bytes, for the // plugin. maxMemoryUsage int64 // maxCPUUsage is the maximum allowed CPU usage in percentage for the plugin. maxCPUUsage int32 // startAttempts is the number of times to attempt to launch the plugin. startAttempts int // protocol (TCP/UDS) is the protocol used for gRPC communication. protocol string // extraArgs are additional arguments (opaque to agent) set by plugin writers // to pass down on process launch. extraArgs []string } // Name returns the name of the step. func (l *launchStep) Name() string { return "LaunchPluginStep" } // Status returns the plugin state for current step. func (l *launchStep) Status() acmpb.CurrentPluginStates_DaemonPluginState_StatusValue { return acmpb.CurrentPluginStates_DaemonPluginState_STARTING } // Status returns the plugin state for current step. func (l *launchStep) ErrorStatus() acmpb.CurrentPluginStates_DaemonPluginState_StatusValue { return acmpb.CurrentPluginStates_DaemonPluginState_CRASHED } // Run unpacks to the target directory and deletes the archive file. func (l *launchStep) Run(ctx context.Context, p *Plugin) error { policy := retry.Policy{MaxAttempts: l.startAttempts, BackoffFactor: 1, Jitter: time.Second} addr, err := address(ctx, l.protocol, p.FullName(), policy) if err != nil { return fmt.Errorf("failed to get address: %w", err) } p.Address = addr p.Manifest.MaxMemoryUsage = l.maxMemoryUsage p.Manifest.MaxCPUUsage = l.maxCPUUsage p.Manifest.StartAttempts = l.startAttempts p.EntryPath = l.entryPath p.Protocol = l.protocol p.Manifest.LaunchArguments = l.extraArgs stateDir := p.stateDir() // Create state directory for the plugin. if err := os.MkdirAll(stateDir, 0755); err != nil { return fmt.Errorf("create state directory %q for plugin %q: %w", stateDir, p.FullName(), err) } // Update plugin path symlink to point to latest. Core plugin install path is // static and does not change across revisions. For instance on Linux it is // always /usr/lib/google/guest_agent/core_plugin. Skip symlink setup for core // plugin. if p.PluginType != PluginTypeCore { if err := file.UpdateSymlink(p.staticInstallPath(), p.InstallPath); err != nil { return fmt.Errorf("UpdateSymlink(%q, %q) failed for plugin %q: %w", p.staticInstallPath(), p.InstallPath, p.FullName(), err) } } args := p.Manifest.LaunchArguments args = append(args, fmt.Sprintf("--protocol=%s", l.protocol), fmt.Sprintf("--address=%s", p.Address), fmt.Sprintf("--errorlogfile=%s", p.logfile())) launchFunc := func() error { opts := run.Options{ Name: p.EntryPath, Args: args, ExecMode: run.ExecModeDetach, } res, err := run.WithContext(ctx, opts) if err != nil { return fmt.Errorf("failed to launch plugin from %q: %w", p.EntryPath, err) } p.setPid(res.Pid) return nil } // Launch the plugin. if err := retry.Run(ctx, policy, launchFunc); err != nil { sendEvent(ctx, p, acmpb.PluginEventMessage_PLUGIN_START_FAILED, fmt.Sprintf("Failed to launch plugin: [%v]. Plugin logs: %s", err, readPluginLogs(p.logfile()))) p.setState(acmpb.CurrentPluginStates_DaemonPluginState_CRASHED) return err } pluginPid := p.pid() constraintFunc := func() error { constraint := resource.Constraint{ PID: pluginPid, Name: p.FullName(), MaxMemoryUsage: p.Manifest.MaxMemoryUsage, MaxCPUUsage: p.Manifest.MaxCPUUsage, } if err := resource.Apply(constraint); err != nil { return err } return nil } // Apply resource constraint. if err := retry.Run(ctx, policy, constraintFunc); err != nil { sendEvent(ctx, p, acmpb.PluginEventMessage_PLUGIN_START_FAILED, fmt.Sprintf("Failed to apply resource constraint: [%v]", err)) p.setState(acmpb.CurrentPluginStates_DaemonPluginState_CRASHED) return err } galog.Debugf("Launched a plugin process from %q with pid %d", p.EntryPath, pluginPid) if err := p.Connect(ctx); err != nil { p.setState(acmpb.CurrentPluginStates_DaemonPluginState_CRASHED) return fmt.Errorf("failed to connect plugin %s: %w", p.FullName(), err) } _, status := p.Start(ctx) if status.Err() != nil { sendEvent(ctx, p, acmpb.PluginEventMessage_PLUGIN_START_FAILED, fmt.Sprintf("Failed to start plugin: [%+v]. Plugin logs: %s", status, readPluginLogs(p.logfile()))) p.setState(acmpb.CurrentPluginStates_DaemonPluginState_CRASHED) return fmt.Errorf("failed to start plugin %s with error: %w", p.FullName(), status.Err()) } sendEvent(ctx, p, acmpb.PluginEventMessage_PLUGIN_STARTED, "Successfully started the plugin.") p.setState(acmpb.CurrentPluginStates_DaemonPluginState_RUNNING) galog.Infof("Successfully started plugin %q", p.FullName()) if err := p.Store(); err != nil { return fmt.Errorf("store plugin %s info failed: %w", p.FullName(), err) } return nil } // connectionsPath returns the path to the socket connections directory. func connectionsPath() string { return filepath.Clean(cfg.Retrieve().Plugin.SocketConnectionsDir) } // connectionSetup creates the socket connections directory and removes the // previous socket address file if it exists. Do this setup before plugin launch // so every plugin doesn't need to handle this separately. func connectionSetup(address string) error { dir := filepath.Dir(address) if err := os.MkdirAll(dir, 0755); err != nil { return fmt.Errorf("create %q connections directory: %w", dir, err) } // Skip cleanup for unit test addresses, unit tests handles itself. if strings.Contains(address, os.TempDir()) && strings.Contains(address, "Test") { return nil } if file.Exists(address, file.TypeFile) { galog.Debugf("Cleaning up previous socket address file %q", address) if err := os.RemoveAll(address); err != nil { // In case of unix sockets they must be unlinked (listener.Close()) before // they're reused again. If file already exist bind can fail. return fmt.Errorf("failed to remove socket address file %q: %v", address, err) } } return nil } // address returns the socket address or port to use for gRPC communication. func address(ctx context.Context, protocol, id string, policy retry.Policy) (string, error) { if protocol == udsProtocol { addr := filepath.Join(connectionsPath(), id+".sock") if err := connectionSetup(addr); err != nil { return "", fmt.Errorf("failed to setup socket connections directory: %w", err) } return addr, nil } // If using TCP find a free open port ready to use. f := func() (string, error) { listener, err := net.Listen("tcp", ":0") if err != nil { return "", fmt.Errorf("failed to get new TCP listener: %w", err) } addr := listener.Addr().String() if err := listener.Close(); err != nil { return "", fmt.Errorf("failed to close new TCP listener: %w", err) } return addr, nil } return retry.RunWithResponse(ctx, policy, f) } // isUDSSupported returns true if UDS is supported on Windows. Instead of going // by version to figure out if UDS is supported, try listening on test address // using UDS, if it gets listener successfully consider UDS is supported. func isUDSSupported() bool { if runtime.GOOS == "linux" { return true } connDir := connectionsPath() if err := os.MkdirAll(connDir, 0755); err != nil { galog.Debugf("Failed to test if UDS is supported, could not create connections directory: %v", err) return false } sockAddr := filepath.Join(connDir, "test-connection") defer func() { if err := os.RemoveAll(sockAddr); err != nil { galog.Debugf("Failed to remove socket file %q: %v", sockAddr, err) } }() listener, err := net.Listen(udsProtocol, sockAddr) if err != nil { galog.Infof("Failed to listen on %q, UDS is unsupported?: %v", sockAddr, err) return false } // Ignore these errors - we just need to know if UDS is supported and this // address is not reused. if err := listener.Close(); err != nil { galog.Debugf("Failed to close UDS listener: %v", err) } return true }