cmd/core_plugin/wsfchealthcheck/agent.go (125 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wsfchealthcheck
import (
"context"
"errors"
"fmt"
"net"
"strings"
"sync/atomic"
"time"
"github.com/GoogleCloudPlatform/galog"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/retry"
)
var (
// policy is the retry policy to check if agent is stopped.
policy = retry.Policy{MaxAttempts: 5, BackoffFactor: 1, Jitter: time.Second}
)
// healthCheck is the interface agent answering health check ping needs to
// implement.
type healthCheck interface {
// isRunning returns true if agent is running.
isRunning() bool
// address returns the address where agent listens on for health check ping.
address() string
// setAddress sets the port where agent listens on for health check ping.
setAddress(string)
// run starts the agent that begin listening for requests.
run(context.Context) error
// stop stops the agent from listening for requests.
stop(context.Context) error
}
// connectOpts contains net connection config.
type connectOpts struct {
// protocol is the protocol of the connection. Its must be either tcp/uds
// where UDS must be used *only* for unit testing.
protocol string
// addr is the address of the connection where its just a port number for TCP
// and socket path for UDS.
addr string
}
// wsfcAgent implements the healthCheck interface.
type wsfcAgent struct {
// running tracks agent status.
running atomic.Bool
// opts contains net connection config.
opts connectOpts
// listener is where this agent is listening on.
listener net.Listener
}
// newWSFCAgent creates a new wsfcAgent instance.
func newWSFCAgent(opts connectOpts) *wsfcAgent {
return &wsfcAgent{opts: opts}
}
// isRunning returns true if agent is running.
func (w *wsfcAgent) isRunning() bool {
return w.running.Load()
}
// address returns the current address agent is listening on.
func (w *wsfcAgent) address() string {
return w.opts.addr
}
// setAddress sets the address for agent to listening on.
func (w *wsfcAgent) setAddress(addr string) {
galog.Infof("Re-setting address from %q -> %q", w.address(), addr)
w.opts.addr = addr
}
// run starts the agent to listen on address configured in [connectOpts].
func (w *wsfcAgent) run(ctx context.Context) error {
if w.isRunning() {
galog.Debugf("wsfc agent is already running, ignoring run request")
return nil
}
galog.Infof("Starting wsfc agent on %+v", w.opts)
var err error
w.listener, err = net.Listen(w.opts.protocol, w.opts.addr)
if err != nil {
return fmt.Errorf("failed to start listener on %q (%s): %w", w.opts.addr, w.opts.protocol, err)
}
w.running.Store(true)
// Go routine for listening requests. This keeps running until context is
// cancelled or the underlying listener is closed.
go func() {
// Reset state while returning from this go routine to indicate that agent
// is stopped.
defer func() {
w.listener = nil
w.running.Store(false)
}()
for ctx.Err() == nil {
// Listener is closed in agent stop().
conn, err := w.listener.Accept()
if err != nil {
if errors.Is(err, net.ErrClosed) {
// Its ok to simply return and exit on error as wsfc agent manager
// will restart the agent if wsfc is still enabled.
galog.Infof("Listener is closed, stopping agent...")
return
}
// If connection is not closed and there's some other error just log and
// retry.
galog.Errorf("Failed to accept connection with error: %v", err)
continue
}
go func() {
if err := w.handleHealthCheckRequest(ctx, conn); err != nil {
galog.Errorf("Failed to handle health check request: %v", err)
}
}()
}
}()
return nil
}
// handleHealthCheckRequest handles health check request.
func (w *wsfcAgent) handleHealthCheckRequest(ctx context.Context, conn net.Conn) error {
galog.Debugf("Handling WSFC health check request")
defer func() {
if err := conn.Close(); err != nil {
galog.Errorf("Failed to close a connection: %v", err)
}
}()
conn.SetDeadline(time.Now().Add(time.Second))
buf := make([]byte, 1024)
reqLen, err := conn.Read(buf)
if err != nil {
return fmt.Errorf("failed to read from connection: %w", err)
}
wsfcIP := strings.TrimSpace(string(buf[:reqLen]))
reply, err := checkIPExist(ctx, wsfcIP)
if err != nil {
return fmt.Errorf("failed to check IP %q existence: %w", wsfcIP, err)
}
galog.Debugf("IP existence check for %q returned %q", wsfcIP, reply)
writeBytes := []byte(reply)
wrote, err := conn.Write(writeBytes)
if err != nil || wrote != len(writeBytes) {
return fmt.Errorf("writing to connection: bytes written = %d, err = %w, expected bytes = %d", wrote, err, len(writeBytes))
}
return nil
}
// stop stops the agent.
func (w *wsfcAgent) stop(ctx context.Context) error {
if !w.isRunning() {
galog.Debugf("wsfc agent is already stopped, ignoring stop request")
return nil
}
galog.Infof("Stopping wsfc agent")
if w.listener != nil {
if err := w.listener.Close(); err != nil {
galog.Errorf("Failed to close listener: %v", err)
}
}
// Wait to confirm if agent has stopped or give up with error after retry
// policy exhausts.
err := retry.Run(ctx, policy, func() error {
if w.isRunning() {
return fmt.Errorf("agent is still running")
}
return nil
})
return err
}