cmd/ops_agent_uap_plugin/service_linux.go (238 lines of code) (raw):
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !windows
package main
import (
"context"
"fmt"
"log"
"os"
"os/exec"
"os/signal"
"path"
"path/filepath"
"regexp"
"strings"
"sync"
"syscall"
"google.golang.org/grpc/status"
pb "github.com/GoogleCloudPlatform/ops-agent/cmd/ops_agent_uap_plugin/google_guest_agent/plugin"
)
const (
OpsAgentConfigLocationLinux = "/etc/google-cloud-ops-agent/config.yaml"
ConfGeneratorBinary = "libexec/google_cloud_ops_agent_engine"
AgentWrapperBinary = "libexec/google_cloud_ops_agent_wrapper"
FluentbitBinary = "subagents/fluent-bit/bin/fluent-bit"
OtelBinary = "subagents/opentelemetry-collector/otelopscol"
LogsDirectory = "log/google-cloud-ops-agent"
FluentBitStateDiectory = "state/fluent-bit"
FluentBitRuntimeDirectory = "run/google-cloud-ops-agent-fluent-bit"
OtelRuntimeDirectory = "run/google-cloud-ops-agent-opentelemetry-collector"
DefaultPluginStateDirectory = "/var/lib/google-guest-agent/agent_state/plugins/ops-agent-plugin"
)
var (
AgentServiceNameRegex = regexp.MustCompile(`[\w-]+\.service`)
AgentSystemdServiceNames = []string{"google-cloud-ops-agent.service", "stackdriver-agent.service", "google-fluentd.service"}
)
// RunSubAgentCommandFunc defines a function type that starts a subagent. If one subagent execution exited, other sugagents are also terminated via context cancellation. This abstraction is introduced
// primarily to facilitate testing by allowing the injection of mock
// implementations.
type RunSubAgentCommandFunc func(ctx context.Context, cancel context.CancelFunc, cmd *exec.Cmd, runCommand RunCommandFunc, wg *sync.WaitGroup)
// Apply applies the config sent or performs the work defined in the message.
// ApplyRequest is opaque to the agent and is expected to be well known contract
// between Plugin and the server itself. For e.g. service might want to update
// plugin config to enable/disable feature here plugins can react to such requests.
func (ps *OpsAgentPluginServer) Apply(ctx context.Context, msg *pb.ApplyRequest) (*pb.ApplyResponse, error) {
return &pb.ApplyResponse{}, nil
}
// Start starts the plugin and initiates the plugin functionality.
// Until plugin receives Start request plugin is expected to be not functioning
// and just listening on the address handed off waiting for the request.
func (ps *OpsAgentPluginServer) Start(ctx context.Context, msg *pb.StartRequest) (*pb.StartResponse, error) {
ps.mu.Lock()
if ps.cancel != nil {
log.Printf("The Ops Agent plugin is started already, skipping the current request")
ps.mu.Unlock()
return &pb.StartResponse{}, nil
}
log.Printf("Received a Start request: %s. Starting the Ops Agent", msg)
pContext, cancel := context.WithCancel(context.Background())
ps.cancel = cancel
ps.mu.Unlock()
pluginInstallPath, err := os.Executable()
if err != nil {
ps.Stop(ctx, &pb.StopRequest{Cleanup: false})
log.Printf("Start() failed, because it cannot determine the plugin install location: %s", err)
return nil, status.Error(13, err.Error()) // Internal
}
pluginInstallPath, err = filepath.EvalSymlinks(pluginInstallPath)
if err != nil {
ps.Stop(ctx, &pb.StopRequest{Cleanup: false})
log.Printf("Start() failed, because it cannot determine the plugin install location: %s", err)
return nil, status.Error(13, err.Error()) // Internal
}
pluginInstallDir := filepath.Dir(pluginInstallPath)
pluginStateDir := msg.GetConfig().GetStateDirectoryPath()
if pluginStateDir == "" {
pluginStateDir = DefaultPluginStateDirectory
}
// Find existing ops agent installation, and conflicting legacy agent installation.
foundConflictingInstallations, err := findPreExistentAgents(pContext, ps.runCommand, AgentSystemdServiceNames)
if foundConflictingInstallations || err != nil {
ps.Stop(ctx, &pb.StopRequest{Cleanup: false})
log.Printf("Start() failed: %s", err)
return nil, status.Error(9, err.Error()) // FailedPrecondition
}
// Receive config from the Start request and write it to the Ops Agent config file.
if err := writeCustomConfigToFile(msg, OpsAgentConfigLocationLinux); err != nil {
log.Printf("Start() failed: %s", err)
ps.Stop(ctx, &pb.StopRequest{Cleanup: false})
return nil, status.Errorf(13, "failed to write the custom Ops Agent config to file: %s", err) // Internal
}
// Ops Agent config validation
if err := validateOpsAgentConfig(pContext, pluginInstallDir, pluginStateDir, ps.runCommand); err != nil {
log.Printf("Start() failed: %s", err)
ps.Stop(ctx, &pb.StopRequest{Cleanup: false})
return nil, status.Errorf(9, "failed to validate Ops Agent config: %s", err) // FailedPrecondition
}
// Subagent config generation
if err := generateSubagentConfigs(pContext, ps.runCommand, pluginInstallDir, pluginStateDir); err != nil {
log.Printf("Start() failed: %s", err)
ps.Stop(ctx, &pb.StopRequest{Cleanup: false})
return nil, status.Errorf(9, "failed to generate subagent configs: %s", err) // FailedPrecondition
}
// the subagent startups
cancelFunc := func() {
ps.Stop(ctx, &pb.StopRequest{Cleanup: false})
}
go runSubagents(pContext, cancelFunc, pluginInstallDir, pluginStateDir, runSubAgentCommand, ps.runCommand)
return &pb.StartResponse{}, nil
}
// Stop is the stop hook and implements any cleanup if required.
// Stop maybe called if plugin revision is being changed.
// For e.g. if plugins want to stop some task it was performing or remove some
// state before exiting it can be done on this request.
func (ps *OpsAgentPluginServer) Stop(ctx context.Context, msg *pb.StopRequest) (*pb.StopResponse, error) {
ps.mu.Lock()
defer ps.mu.Unlock()
if ps.cancel == nil {
log.Printf("The Ops Agent plugin is stopped already, skipping the current request")
return &pb.StopResponse{}, nil
}
log.Printf("Received a Stop request: %s. Stopping the Ops Agent", msg)
ps.cancel()
ps.cancel = nil
return &pb.StopResponse{}, nil
}
// GetStatus is the health check agent would perform to make sure plugin process
// is alive. If request fails process is considered dead and relaunched. Plugins
// can share any additional information to report it to the service. For e.g. if
// plugins detect some non-fatal errors causing it unable to offer some features
// it can reported in status which is sent back to the service by agent.
func (ps *OpsAgentPluginServer) GetStatus(ctx context.Context, msg *pb.GetStatusRequest) (*pb.Status, error) {
log.Println("Received a GetStatus request")
ps.mu.Lock()
defer ps.mu.Unlock()
if ps.cancel == nil {
log.Println("The Ops Agent plugin is not running")
return &pb.Status{Code: 1, Results: []string{"The Ops Agent Plugin is not running."}}, nil
}
log.Println("The Ops Agent plugin is running")
return &pb.Status{Code: 0, Results: []string{"The Ops Agent Plugin is running ok."}}, nil
}
// runSubagents starts up otel and fluent bit subagents in separate goroutines.
// All child goroutines create a new context derived from the same parent context.
// This ensures that crashes in one goroutine don't affect other goroutines.
// However, when one goroutine exits with errors, it won't be restarted, and all other goroutines are also terminated.
// This is done by canceling the parent context.
// This makes sure that GetStatus() returns a non-healthy status, signaling UAP to Start() the plugin again.
//
// ctx: the parent context that all child goroutines share.
//
// cancel: the cancel function for the parent context. By calling this function, the parent context is canceled,
// and GetStatus() returns a non-healthy status, signaling UAP to re-trigger Start().
func runSubagents(ctx context.Context, cancel context.CancelFunc, pluginInstallDirectory string, pluginStateDirectory string, runSubAgentCommand RunSubAgentCommandFunc, runCommand RunCommandFunc) {
// Register signal handler and implements its callback.
sigHandler(ctx, func(_ os.Signal) {
cancel()
})
var wg sync.WaitGroup
// Starting Otel
runOtelCmd := exec.CommandContext(ctx,
path.Join(pluginInstallDirectory, OtelBinary),
"--config", path.Join(pluginStateDirectory, OtelRuntimeDirectory, "otel.yaml"),
)
wg.Add(1)
go runSubAgentCommand(ctx, cancel, runOtelCmd, runCommand, &wg)
// Starting FluentBit
runFluentBitCmd := exec.CommandContext(ctx,
path.Join(pluginInstallDirectory, AgentWrapperBinary),
"-config_path", OpsAgentConfigLocationLinux,
"-log_path", path.Join(pluginStateDirectory, LogsDirectory, "subagents/logging-module.log"),
path.Join(pluginInstallDirectory, FluentbitBinary),
"--config", path.Join(pluginStateDirectory, FluentBitRuntimeDirectory, "fluent_bit_main.conf"),
"--parser", path.Join(pluginStateDirectory, FluentBitRuntimeDirectory, "fluent_bit_parser.conf"),
"--storage_path", path.Join(pluginStateDirectory, FluentBitStateDiectory, "buffers"),
)
wg.Add(1)
go runSubAgentCommand(ctx, cancel, runFluentBitCmd, runCommand, &wg)
wg.Wait()
}
func runSubAgentCommand(ctx context.Context, cancel context.CancelFunc, cmd *exec.Cmd, runCommand RunCommandFunc, wg *sync.WaitGroup) {
defer wg.Done()
if cmd == nil {
return
}
if ctx.Err() != nil {
// context has been cancelled
log.Printf("cannot execute command: %s, because the context has been cancelled", cmd.Args)
return
}
output, err := runCommand(cmd)
if err != nil {
log.Printf("command: %s exited with errors, not restarting.\nCommand output: %s\n Command error:%s", cmd.Args, string(output), err)
} else {
log.Printf("command: %s %s exited successfully.\nCommand output: %s", cmd.Path, cmd.Args, string(output))
}
cancel() // cancels the parent context which also stops other Ops Agent sub-binaries from running.
return
}
// sigHandler handles SIGTERM, SIGINT etc signals. The function provided in the
// cancel argument handles internal framework termination and the plugin
// interface notification of the "exiting" state.
func sigHandler(ctx context.Context, cancel func(sig os.Signal)) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGHUP)
go func() {
select {
case sig := <-sigChan:
log.Printf("Got signal: %d, leaving...", sig)
close(sigChan)
cancel(sig)
case <-ctx.Done():
break
}
}()
}
func runCommand(cmd *exec.Cmd) (string, error) {
if cmd == nil {
return "", nil
}
cmd.SysProcAttr = &syscall.SysProcAttr{
Pdeathsig: syscall.SIGKILL,
}
log.Printf("Running command: %s", cmd.Args)
out, err := cmd.CombinedOutput()
if err != nil {
log.Printf("Command %s failed, \ncommand output: %s\ncommand error: %s", cmd.Args, string(out), err)
}
return string(out), err
}
func validateOpsAgentConfig(ctx context.Context, pluginInstallDirectory string, pluginStateDirectory string, runCommand RunCommandFunc) error {
configValidationCmd := exec.CommandContext(ctx,
path.Join(pluginInstallDirectory, ConfGeneratorBinary),
"-in", OpsAgentConfigLocationLinux,
"-logs", path.Join(pluginStateDirectory, LogsDirectory),
)
if output, err := runCommand(configValidationCmd); err != nil {
return fmt.Errorf("failed to validate the Ops Agent config:\ncommand output: %s\ncommand error: %s", output, err)
}
return nil
}
func generateSubagentConfigs(ctx context.Context, runCommand RunCommandFunc, pluginInstallDirectory string, pluginStateDirectory string) error {
confGeneratorBinaryFullPath := path.Join(pluginInstallDirectory, ConfGeneratorBinary)
otelConfigGenerationCmd := exec.CommandContext(ctx,
confGeneratorBinaryFullPath,
"-service", "otel",
"-in", OpsAgentConfigLocationLinux,
"-out", path.Join(pluginStateDirectory, OtelRuntimeDirectory),
"-logs", path.Join(pluginStateDirectory, LogsDirectory))
if output, err := runCommand(otelConfigGenerationCmd); err != nil {
return fmt.Errorf("failed to generate Otel config:\ncommand output: %s\ncommand error: %s", output, err)
}
fluentBitConfigGenerationCmd := exec.CommandContext(ctx,
confGeneratorBinaryFullPath,
"-service", "fluentbit",
"-in", OpsAgentConfigLocationLinux,
"-out", path.Join(pluginStateDirectory, FluentBitRuntimeDirectory),
"-logs", path.Join(pluginStateDirectory, LogsDirectory), "-state", path.Join(pluginStateDirectory, FluentBitStateDiectory))
if output, err := runCommand(fluentBitConfigGenerationCmd); err != nil {
return fmt.Errorf("failed to generate Fluntbit config:\ncommand output: %s\ncommand error: %s", output, err)
}
return nil
}
func findPreExistentAgents(ctx context.Context, runCommand RunCommandFunc, agentSystemdServiceNames []string) (bool, error) {
cmdArgs := []string{"systemctl", "list-unit-files"}
cmdArgs = append(cmdArgs, agentSystemdServiceNames...)
findOpsAgentCmd := exec.CommandContext(ctx,
cmdArgs[0], cmdArgs[1:]...,
)
output, err := runCommand(findOpsAgentCmd)
if strings.Contains(output, "0 unit files listed.") {
return false, nil
}
if err != nil {
return false, fmt.Errorf("unable to verify the existing Ops Agent and legacy agent installations, error: %s", err)
}
alreadyInstalledAgents := AgentServiceNameRegex.FindAllString(output, -1)
if len(alreadyInstalledAgents) == 0 {
return false, nil
}
log.Printf("The following systemd services are already installed on the VM: %v\n command output: %v\ncommand error: %v", alreadyInstalledAgents, output, err)
return true, fmt.Errorf("conflicting installations identified: %v", alreadyInstalledAgents)
}