google_guest_agent/command/command_monitor.go (183 lines of code) (raw):
// Copyright 2023 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
* This file contains the details of command's internal communication protocol
* listener. Most callers should not need to call anything in this file. The
* command handler and caller API is contained in command.go.
*/
package command
import (
"bufio"
"context"
"encoding/json"
"errors"
"net"
"os"
"strconv"
"sync"
"time"
"github.com/GoogleCloudPlatform/guest-agent/google_guest_agent/cfg"
"github.com/GoogleCloudPlatform/guest-logging-go/logger"
)
var cmdMonitor *Monitor = &Monitor{
handlersMu: new(sync.RWMutex),
handlers: make(map[string]Handler),
}
// Init starts an internally managed command server. The agent configuration
// will decide the server options. Returns a reference to the internally managed
// command monitor which the caller can Close() when appropriate.
func Init(ctx context.Context) {
if cmdMonitor.srv != nil {
return
}
pipe := cfg.Get().Unstable.CommandPipePath
if pipe == "" {
pipe = DefaultPipePath
}
to, err := time.ParseDuration(cfg.Get().Unstable.CommandRequestTimeout)
if err != nil {
logger.Errorf("commmand request timeout configuration is not a valid duration string, falling back to 10s timeout")
to = time.Duration(10) * time.Second
}
pipemode, err := strconv.ParseInt(cfg.Get().Unstable.CommandPipeMode, 8, 32)
if err != nil {
logger.Errorf("could not parse command_pipe_mode as octal integer: %v falling back to mode 0770", err)
pipemode = 0770
}
cmdMonitor.srv = &Server{
pipe: pipe,
pipeMode: int(pipemode),
pipeGroup: cfg.Get().Unstable.CommandPipeGroup,
timeout: to,
monitor: cmdMonitor,
}
err = cmdMonitor.srv.start(ctx)
if err != nil {
logger.Errorf("failed to start command server: %s", err)
}
}
// Close will close the internally managed command server, if it was initialized.
func Close() error {
if cmdMonitor.srv != nil {
return cmdMonitor.srv.Close()
}
return nil
}
// Monitor is the structure which handles command registration and deregistration.
type Monitor struct {
srv *Server
handlersMu *sync.RWMutex
handlers map[string]Handler
}
// Close stops the server from listening to commands.
func (m *Monitor) Close() error { return m.srv.Close() }
// Start begins listening for commands.
func (m *Monitor) Start(ctx context.Context) error { return m.srv.start(ctx) }
// Server is the server structure which will listen for command requests and
// route them to handlers. Most callers should not interact with this directly.
type Server struct {
pipe string
pipeMode int
pipeGroup string
timeout time.Duration
srv net.Listener
monitor *Monitor
}
// Close signals the server to stop listening for commands and stop waiting to
// listen.
func (c *Server) Close() error {
if c.srv != nil {
return c.srv.Close()
}
return nil
}
func (c *Server) start(ctx context.Context) error {
if c.srv != nil {
return errors.New("server already listening")
}
srv, err := listen(ctx, c.pipe, c.pipeMode, c.pipeGroup)
if err != nil {
return err
}
go func() {
defer srv.Close()
for {
if ctx.Err() != nil {
return
}
conn, err := srv.Accept()
if err != nil {
if err == net.ErrClosed {
break
}
logger.Infof("error on connection to pipe %s: %v", c.pipe, err)
continue
}
go func(conn net.Conn) {
defer conn.Close()
// Go has lots of helpers to do this for us but none of them return the byte
// slice afterwards, and we need it for the handler
var b []byte
r := bufio.NewReader(conn)
var depth int
deadline := time.Now().Add(c.timeout)
e := conn.SetReadDeadline(deadline)
if e != nil {
logger.Infof("could not set read deadline on command request: %v", e)
return
}
for {
if time.Now().After(deadline) {
if b, err := json.Marshal(TimeoutError); err != nil {
conn.Write(internalError)
} else {
conn.Write(b)
}
return
}
rune, _, err := r.ReadRune()
if err != nil {
logger.Debugf("connection read error: %v", err)
if errors.Is(err, os.ErrDeadlineExceeded) {
if b, err := json.Marshal(TimeoutError); err != nil {
conn.Write(internalError)
} else {
conn.Write(b)
}
} else {
if b, err := json.Marshal(ConnError); err != nil {
conn.Write(internalError)
} else {
conn.Write(b)
}
}
return
}
b = append(b, byte(rune))
switch rune {
case '{':
depth++
case '}':
depth--
}
// Must check here because the first pass always depth = 0
if depth == 0 {
break
}
}
var req Request
err := json.Unmarshal(b, &req)
if err != nil {
if b, err := json.Marshal(BadRequestError); err != nil {
conn.Write(internalError)
} else {
conn.Write(b)
}
return
}
c.monitor.handlersMu.RLock()
defer c.monitor.handlersMu.RUnlock()
handler, ok := c.monitor.handlers[req.Command]
if !ok {
if b, err := json.Marshal(CmdNotFoundError); err != nil {
conn.Write(internalError)
} else {
conn.Write(b)
}
return
}
resp, err := handler(b)
if err != nil {
re := Response{Status: HandlerError.Status, StatusMessage: err.Error()}
if b, err := json.Marshal(re); err != nil {
resp = internalError
} else {
resp = b
}
}
conn.Write(resp)
}(conn)
}
}()
c.srv = srv
return nil
}