internal/command/command_monitor.go (226 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
* This file contains the details of command's internal communication protocol
* listener. Most callers should not need to call anything in this file. The
* command handler and caller API is contained in command.go.
*/
package command
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"os"
"reflect"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/GoogleCloudPlatform/galog"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/cfg"
)
const (
// Fallback constants should be the same as the default values in the cfg
// package.
fallbackTimeout = time.Duration(10) * time.Second
fallbackPipeMode = 0770
)
// cmdMonitor holds the registered command handlers. It does not hold the
// server listener.
var cmdMonitor *Monitor = &Monitor{
handlers: make(map[string]Handler),
}
func parseTimeoutFromCfg() time.Duration {
timeout, err := time.ParseDuration(cfg.Retrieve().Unstable.CommandRequestTimeout)
if err != nil {
galog.Errorf("commmand request timeout configuration is not a valid duration string, falling back to default 10s timeout")
return fallbackTimeout
}
return timeout
}
func parsePipemodeFromCfg() int {
pipemode, err := strconv.ParseInt(cfg.Retrieve().Unstable.CommandPipeMode, 8, 32)
if err != nil {
galog.Errorf("could not parse command_pipe_mode as octal integer: %v falling back to default mode 0770", err)
return fallbackPipeMode
}
return int(pipemode)
}
// Setup starts an internally managed command server. The agent configuration
// will decide the server options.
func Setup(ctx context.Context, listener KnownListeners) error {
// Only setup command monitor if it is enabled in the configuration file.
if !cfg.Retrieve().Unstable.CommandMonitorEnabled {
galog.Debug("Command monitor is disabled in the configuration file, skipping setup")
return nil
}
galog.Debugf("Setting up command monitor for %v", listener)
if cmdMonitor.srv != nil {
return fmt.Errorf("command monitor is already running")
}
if err := cmdMonitor.RegisterHandler(getOptionCommand, getOption); err != nil {
galog.Errorf("Could not register command handler for %s: %v", getOptionCommand, err)
}
timeout := parseTimeoutFromCfg()
pipemode := parsePipemodeFromCfg()
cmdMonitor.srv = &Server{
pipe: PipeName(listener),
pipeMode: int(pipemode),
pipeGroup: cfg.Retrieve().Unstable.CommandPipeGroup,
timeout: timeout,
monitor: cmdMonitor,
}
return cmdMonitor.srv.start(ctx)
}
// Close will close the internally managed command server, if it was
// initialized.
func Close(_ context.Context) {
if err := cmdMonitor.UnregisterHandler(getOptionCommand); err != nil {
galog.Errorf("Could not unregister command handler for %s: %v", getOptionCommand, err)
}
if cmdMonitor.srv != nil {
if err := cmdMonitor.srv.Close(); err != nil {
galog.Errorf("error closing command-monitor: %v", err)
}
cmdMonitor.srv = nil
}
}
// Monitor is the structure which handles command registration and
// deregistration.
type Monitor struct {
srv *Server
handlersMu sync.RWMutex
handlers map[string]Handler
}
// Close stops the server from listening to commands.
func (m *Monitor) Close() error { return m.srv.Close() }
// Start begins listening for commands.
func (m *Monitor) Start(ctx context.Context) error { return m.srv.start(ctx) }
// Server is the server structure which will listen for command requests and
// route them to handlers. Most callers should not interact with this directly.
type Server struct {
pipe string
pipeMode int
pipeGroup string
timeout time.Duration
srv net.Listener
monitor *Monitor
}
// Close signals the server to stop listening for commands and stop waiting to
// listen.
func (c *Server) Close() error {
if c.srv != nil {
return c.srv.Close()
}
return nil
}
func readOrError(c net.Conn) ([]byte, bool) {
b := make([]byte, 1024)
n, err := c.Read(b)
if err == nil {
return b[:n], true
}
if errors.Is(err, os.ErrDeadlineExceeded) {
if e, err := json.Marshal(TimeoutError); err == nil {
c.Write(e)
return nil, false
}
} else {
if e, err := json.Marshal(ConnError); err == nil {
c.Write(e)
return nil, false
}
}
c.Write(internalError)
return nil, false
}
func (c *Server) start(ctx context.Context) error {
if c.srv != nil {
return errors.New("server already listening")
}
galog.Debugf("Starting command server at %q", c.pipe)
srv, err := listen(ctx, c.pipe, c.pipeMode, c.pipeGroup)
if err != nil {
return err
}
go func() {
defer srv.Close()
for {
if ctx.Err() != nil {
return
}
conn, err := srv.Accept()
if err != nil {
if errors.Is(err, net.ErrClosed) {
break
}
galog.Errorf("error on connection to pipe %s: %v", c.pipe, err)
continue
}
go func(conn net.Conn) {
defer conn.Close()
// Go has lots of helpers to read json from an io.Reader but none of
// them return the byte slice afterwards and we need it for the handler.
deadline := time.Now().Add(c.timeout)
if err := conn.SetDeadline(deadline); err != nil {
galog.Errorf("could not set deadline on command request: %v", err)
return
}
message, ok := readOrError(conn)
if !ok {
return
}
var req Request
for err := json.Unmarshal(message, &req); err != nil; err = json.Unmarshal(message, &req) {
b, ok := readOrError(conn)
if !ok {
return
}
message = append(message, b...)
}
c.monitor.handlersMu.RLock()
defer c.monitor.handlersMu.RUnlock()
handler, ok := c.monitor.handlers[req.Command]
if !ok {
if b, err := json.Marshal(CmdNotFoundError); err != nil {
conn.Write(internalError)
} else {
conn.Write(b)
}
return
}
resp, err := handler(ctx, message)
if err != nil {
re := Response{Status: HandlerError.Status, StatusMessage: err.Error()}
if b, err := json.Marshal(re); err != nil {
resp = internalError
} else {
resp = b
}
}
conn.Write(resp)
}(conn)
}
}()
c.srv = srv
return nil
}
// getOptionRequest is a request to get a config value by name. Guest agent
// code should use the cfg package, this is intended for use outside of the
// agent by the guest environment.
type getOptionRequest struct {
Request
Option string
}
// getOptionResponse is a response to a getOptionRequest with the config value.
type getOptionResponse struct {
Response
Option string
Value string
}
// getOptionCommand is the command name for getOption.
const getOptionCommand = "agent.config.getoption"
var validKey = regexp.MustCompile("^[A-Z][a-zA-Z]+$")
// getOption processes getOptionRequests encoded as json coming from the
// command monitor.
func getOption(_ context.Context, b []byte) ([]byte, error) {
var req getOptionRequest
if err := json.Unmarshal(b, &req); err != nil {
return nil, err
}
var resp getOptionResponse
resp.Option = req.Option
if req.Option == "" {
resp.Status = 2
resp.StatusMessage = "No option specified"
return json.Marshal(resp)
}
var opt any
opt = cfg.Retrieve()
for _, k := range strings.Split(resp.Option, ".") {
if !validKey.MatchString(k) {
resp.Status = 3
resp.StatusMessage = "Invalid option, key names must start with uppercase"
return json.Marshal(resp)
}
field := reflect.Indirect(reflect.ValueOf(opt)).FieldByName(k)
if !field.IsValid() {
resp.Status = 1
resp.StatusMessage = "Option does not exist"
return json.Marshal(resp)
}
opt = field.Interface()
}
resp.Value = fmt.Sprintf("%v", opt)
return json.Marshal(resp)
}