cmd/core_plugin/service.go (142 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"encoding/json"
"fmt"
"net"
"os"
"os/signal"
"syscall"
"github.com/GoogleCloudPlatform/galog"
"github.com/GoogleCloudPlatform/google-guest-agent/cmd/core_plugin/stages/early"
"github.com/GoogleCloudPlatform/google-guest-agent/cmd/core_plugin/stages/late"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/command"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/events"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/logger"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/metadata"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/plugin/manager"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
pb "github.com/GoogleCloudPlatform/google-guest-agent/internal/plugin/proto/google_guest_agent/plugin"
)
const (
// initStatusRequest type is the request type to check if early initialization
// is completed.
initStatusRequest = "early-initialization"
)
// pluginServer is the core plugin server that implements the plugin interface.
var pluginServer *PluginServer
// initPluginServer initializes the core plugin server and starts serving
// requests from Guest Agent.
func initPluginServer() error {
listener, err := net.Listen(protocol, address)
if err != nil {
return fmt.Errorf("start listening on %q using %q: %v", address, protocol, err)
}
defer listener.Close()
// This is the grpc server in communication with the Guest Agent.
server := grpc.NewServer()
pluginServer = &PluginServer{server: server}
// Successfully registering the server and starting to listen on the address
// offered mean Guest Agent was successful in installing/launching the plugin
// & will manage the lifecycle (start, stop, or revision change) here onwards.
pb.RegisterGuestAgentPluginServer(server, pluginServer)
if err := server.Serve(listener); err != nil {
return fmt.Errorf("cannot continue serving on %q: %v", address, err)
}
return nil
}
// runAgent runs the agent early initialization steps and starts event manager.
func (ps *PluginServer) runAgent(ctx context.Context) {
galog.Infof("Running core plugin...")
// Register signal handler and implements its callback.
sigHandler(ctx, func(_ os.Signal) {
// We're handling some external signal here, set cleanup to [false].
// If this was Guest Agent trying to stop it would call [Stop] RPC directly
// or do a [SIGKILL] which anyways cannot be intercepted.
ps.Stop(ctx, &pb.StopRequest{Cleanup: false})
})
// Run early platform initialization path. All the steps executed in this
// phase assumes metadata server is not accessible yet.
// It is ok to run this is separate go routine and not within [Start] RPC as
// we have [GetStatus: early-initialization] check way to report Guest Agent
// that core plugin has successfully initialized.
if err := early.Retrieve().Run(ctx); err != nil {
logAndExit(fmt.Sprintf("Failed to run early initialization: %v", err))
}
galog.Infof("Initialized (version: %q)", version)
// Run late modules initialization path. The code path executed from this
// point on assumes metadata server is accessible and the platform is fully
// initialized. Any reported error is in the context of the pre or post module
// initialization, the (per) module initialization errors are logged and the
// module is marked as disabled.
if err := late.Retrieve().Run(ctx); err != nil {
logAndExit(fmt.Sprintf("Failed to run late initialization: %v", err))
}
defer func() {
galog.Infof("Stopping core plugin...")
ps.server.GracefulStop()
}()
// This kind of runs for a life-time of process. It returns when all watchers
// are done or context is closed. MDS watcher is never removed and keeps the
// the runner alive for the process's life-time.
if err := events.FetchManager().Run(ctx); err != nil {
logAndExit(fmt.Sprintf("Failed to run event manager: %v", err))
}
}
// sigHandler handles SIGTERM, SIGINT etc signals. The function provided in the
// cancel argument handles internal framework termination and the plugin
// interface notification of the "exiting" state.
func sigHandler(ctx context.Context, cancel func(sig os.Signal)) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGHUP)
go func() {
select {
case sig := <-sigChan:
galog.Infof("Got signal: %d, leaving...", sig)
close(sigChan)
cancel(sig)
case <-ctx.Done():
break
}
}()
}
// handleVMEvent spins up the metadata script runner that runs scripts based on
// the VM event.
func handleVMEvent(ctx context.Context, req []byte) error {
galog.Debugf("Handling VM event")
evReq := &manager.Request{}
if err := json.Unmarshal(req, evReq); err != nil {
return fmt.Errorf("unmarshal VM event request: %w", err)
}
// Handling shutdown event would allow for a graceful shutdown of the core
// plugin.
if evReq.Event != "shutdown" {
galog.Debugf("Ignoring VM event %q", evReq.Event)
return nil
}
galog.Infof("Shutdown event received, shutting down core plugin...")
// Stop response is always empty and error is nil so ignore it.
pluginServer.Stop(ctx, &pb.StopRequest{Cleanup: false})
return nil
}
// PluginServer implements the core-plugin RPC server interface.
type PluginServer struct {
// server is the grpc server that serves RPC requests for core plugin.
server *grpc.Server
// cancel is the cancel function to be called when core plugin is stopped.
cancel context.CancelFunc
// This is for compatibility with `protoc`, which requires this be embedded.
pb.UnimplementedGuestAgentPluginServer
}
// Apply applies the config sent or performs the work defined in the message.
// There's no use-case defined for this yet and is un-implemented.
func (ps *PluginServer) Apply(ctx context.Context, msg *pb.ApplyRequest) (*pb.ApplyResponse, error) {
galog.Debugf("Handling apply request %+v", msg)
req := &command.Request{}
resp := &pb.ApplyResponse{}
if err := json.Unmarshal(msg.GetData().GetValue(), req); err != nil {
return resp, status.Errorf(1, "failed to unmarshal apply request (%s): %v", string(msg.GetData().GetValue()), err)
}
switch req.Command {
case manager.VMEventCmd:
if err := handleVMEvent(ctx, msg.GetData().GetValue()); err != nil {
galog.Errorf("Failed to handle VM event: %v", err)
}
return resp, nil
default:
return resp, status.Errorf(1, "unsupported command: %q", req.Command)
}
}
// Start starts the plugin and initiates the plugin functionality.
func (ps *PluginServer) Start(ctx context.Context, msg *pb.StartRequest) (*pb.StartResponse, error) {
// This is core plugin context. Context received in the request cannot be used
// here as it can have request timeouts or deadlines set by the Guest Agent.
// Context's lifetimes are scoped to that of the request when the request is
// finished, the context is cancelled.
// Treat this as the entry point for a plugin to be functional.
pCtx, cancel := context.WithCancel(context.Background())
ps.cancel = cancel
logOpts.ProgramVersion = version
if err := logger.Init(pCtx, logOpts); err != nil {
return nil, status.Errorf(1, "failed to initialize logger: %v", err)
}
loggerInitialized.Store(true)
galog.Debugf("Handling start request %+v", msg)
if err := events.FetchManager().AddWatcher(pCtx, metadata.NewWatcher()); err != nil {
return nil, status.Errorf(1, "failed to add metadata watcher: %v", err)
}
// Time it takes for early initialization steps may vary and event manager runs
// for a life-time of process. All that should not impact [Start] RPC. Run it
// in another go routine and return, otherwise [Start] RPC can timeout and
// fail for agent.
// This go routine exits when event manager's [Run] method returns.
go ps.runAgent(pCtx)
return &pb.StartResponse{}, nil
}
// Stop is the stop hook and implements core plugin stop workflow.
func (ps *PluginServer) Stop(ctx context.Context, msg *pb.StopRequest) (*pb.StopResponse, error) {
galog.Infof("Handling stop request %+v, stopping core plugin...", msg)
galog.Shutdown(galogShutdownTimeout)
ps.cancel()
return &pb.StopResponse{}, nil
}
// GetStatus is the health check agent would perform to make sure plugin process
// is alive.
func (ps *PluginServer) GetStatus(ctx context.Context, msg *pb.GetStatusRequest) (*pb.Status, error) {
galog.Debugf("Handling get status request %+v", msg)
switch msg.GetData() {
case initStatusRequest:
if early.Retrieve().Initialized() {
return &pb.Status{Code: 0, Results: []string{"successfully completed early initialization"}}, nil
}
return &pb.Status{Code: 1, Results: []string{"still working..."}}, nil
default:
return &pb.Status{Code: 0, Results: []string{"core-plugin-alive, running ok"}}, nil
}
}