internal/plugin/manager/plugin.go (163 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package manager
import (
"bytes"
"context"
"encoding/gob"
"fmt"
"os"
"path/filepath"
"time"
"github.com/GoogleCloudPlatform/galog"
pb "github.com/GoogleCloudPlatform/google-guest-agent/internal/plugin/proto/google_guest_agent/plugin"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/retry"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
apb "google.golang.org/protobuf/types/known/anypb"
dpb "google.golang.org/protobuf/types/known/durationpb"
)
const (
// default timeouts for gRPC calls with plugin so Guest Agent does not end up
// waiting for a response forever.
defaultApplyRPCTimeout = time.Second * 5
defaultStatusRPCTimeout = time.Second * 2
)
// PluginService returns the underlying plugin service client.
func (p *Plugin) PluginService() pb.GuestAgentPluginClient {
return pb.NewGuestAgentPluginClient(p.client)
}
// buildStartRequest generates Start RPC request based on what service config
// was supplied during install.
func (p *Plugin) buildStartRequest(ctx context.Context) (*pb.StartRequest, error) {
req := &pb.StartRequest{
Config: &pb.StartRequest_Config{StateDirectoryPath: p.stateDir()},
}
if p.Manifest.StartConfig == nil {
return req, nil
}
// Start config is optional and may not be present.
if len(p.Manifest.StartConfig.Simple) != 0 {
req.ServiceConfig = &pb.StartRequest_StringConfig{StringConfig: p.Manifest.StartConfig.Simple}
} else if len(p.Manifest.StartConfig.Structured) != 0 {
c, err := p.Manifest.StartConfig.toProto()
if err != nil {
return nil, fmt.Errorf("unable to generate start request for %q plugin: %w", p.FullName(), err)
}
req.ServiceConfig = &pb.StartRequest_StructConfig{StructConfig: c}
}
return req, nil
}
// Start makes plugin RPC start request.
func (p *Plugin) Start(ctx context.Context) (*pb.StartResponse, *status.Status) {
galog.Debugf("Executing start request on plugin %q", p.FullName())
policy := retry.Policy{MaxAttempts: p.Manifest.StartAttempts, BackoffFactor: 1, Jitter: time.Second}
req, err := p.buildStartRequest(ctx)
if err != nil {
return nil, status.Convert(err)
}
tCtx, cancel := context.WithTimeout(ctx, p.Manifest.StartTimeout)
defer cancel()
f := func() (*pb.StartResponse, error) {
return p.PluginService().Start(tCtx, req, grpc.WaitForReady(true))
}
resp, err := retry.RunWithResponse(tCtx, policy, f)
return resp, status.Convert(err)
}
// Stop makes plugin RPC stop request.
func (p *Plugin) Stop(ctx context.Context, cleanup bool) (*pb.StopResponse, *status.Status) {
galog.Debugf("Executing stop request on plugin %q", p.FullName())
if p.client == nil {
return nil, status.Convert(fmt.Errorf("plugin %q is not connected, cannot call Stop RPC", p.FullName()))
}
req := &pb.StopRequest{
Cleanup: cleanup,
Deadline: &dpb.Duration{Seconds: int64(p.Manifest.StopTimeout.Seconds())},
}
tCtx, cancel := context.WithTimeout(ctx, p.Manifest.StopTimeout)
defer cancel()
resp, err := p.PluginService().Stop(tCtx, req, grpc.WaitForReady(true))
return resp, status.Convert(err)
}
// Apply makes plugin RPC apply request.
func (p *Plugin) Apply(ctx context.Context, reqBytes []byte) (*pb.ApplyResponse, *status.Status) {
galog.Debugf("Executing apply request on plugin %q", p.FullName())
req := &pb.ApplyRequest{
Data: &apb.Any{Value: reqBytes},
}
tCtx, cancel := context.WithTimeout(ctx, defaultApplyRPCTimeout)
defer cancel()
resp, err := p.PluginService().Apply(tCtx, req, grpc.WaitForReady(true))
return resp, status.Convert(err)
}
// GetStatus makes the GetStatus RPC request, [req] includes provides the
// context on what the request is about. For e.g. if we want status for task A,
// context could be task ID. For regular health check leave it empty.
func (p *Plugin) GetStatus(ctx context.Context, req string) (*pb.Status, *status.Status) {
galog.Debugf("Executing get status request (%s) on plugin %q", req, p.FullName())
var data *string
tCtx, cancel := context.WithTimeout(ctx, defaultStatusRPCTimeout)
defer cancel()
if req != "" {
data = proto.String(req)
}
r := &pb.GetStatusRequest{Data: data}
resp, err := p.PluginService().GetStatus(tCtx, r, grpc.WaitForReady(true))
return resp, status.Convert(err)
}
// connectAddress returns the address to connect to based on protocol.
// Refer https://github.com/grpc/grpc/blob/master/doc/naming.md for address
// naming convention.
func (p *Plugin) connectAddress() string {
if p.Protocol == tcpProtocol {
return p.Address
}
return fmt.Sprintf("%s:%s", udsProtocol, p.Address)
}
// Connect tries to establish grpc connection to the plugin server.
func (p *Plugin) Connect(ctx context.Context) error {
galog.Debugf("Dialing in on plugin %q", p.FullName())
if p.client != nil {
// Close the previous client connection before attempting to reconnect.
p.client.Close()
p.client = nil
}
options := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
conn, err := grpc.NewClient(p.connectAddress(), options...)
if err != nil {
return fmt.Errorf("failed to dial on %q: %w", p.Address, err)
}
p.client = conn
return nil
}
// stateFile returns the path to the state file for this plugin.
func (p *Plugin) stateFile() string {
return filepath.Join(agentPluginState(), p.Name+".gob")
}
// Store writes plugin information to the file.
func (p *Plugin) Store() error {
fname := p.stateFile()
if err := os.MkdirAll(filepath.Dir(fname), 0755); err != nil {
return fmt.Errorf("unable to create directory %q: %w", filepath.Dir(fname), err)
}
b := new(bytes.Buffer)
err := gob.NewEncoder(b).Encode(p)
if err != nil {
return fmt.Errorf("unable to encode plugin: %w", err)
}
fh, err := os.OpenFile(fname, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("open file %q: %w", fname, err)
}
defer fh.Close()
if _, err := fh.Write(b.Bytes()); err != nil {
return fmt.Errorf("write plugin info to %q: %w", fname, err)
}
galog.V(2).Debugf("Sucessfully wrote plugin info to %q", fname)
return nil
}
// IsRunning checks if the plugin is running by reconnecting and executing a
// health check.
func (p *Plugin) IsRunning(ctx context.Context) bool {
galog.Debugf("Checking if plugin %q is running", p.FullName())
if err := p.Connect(ctx); err != nil {
galog.Debugf("Failed to connect to plugin %q: %v", p.FullName(), err)
return false
}
_, e := p.GetStatus(ctx, "")
if e.Err() != nil {
// Health check failed, plugin is probably not running.
galog.Debugf("Plugin health check failed, unable to get status of plugin %q: %+v", p.FullName(), e)
return false
}
return true
}
// stateDir returns the path to the scratch directory for this plugin. This
// is the directory where the plugin can store any state that it needs to
// persist across revisions.
func (p *Plugin) stateDir() string {
return filepath.Join(baseState(), agentStateDir, pluginInstallDir, p.Name)
}
// logfile returns the path to the log file for this plugin. These logs are
// written by the plugin which agent collects and pushes it out to the ACS when
// any plugin crash is detected.
// This log file not meant for general logging, but only for error logs plugins
// would want agent to collect and send to ACS. After every flush this log file
// is truncated.
func (p *Plugin) logfile() string {
return filepath.Join(p.stateDir(), "plugin.log")
}
// staticInstallPath returns the install path which remains the same across
// revisions. This will eventually become a symlink to latest running plugin
// revision.
func (p *Plugin) staticInstallPath() string {
return filepath.Join(baseState(), pluginInstallDir, p.Name)
}