cmd/ops_agent_uap_plugin/service_windows.go (309 lines of code) (raw):

// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //go:build windows // +build windows package main import ( "context" "fmt" "log" "os" "os/exec" "path" "path/filepath" "sync" "unsafe" "github.com/GoogleCloudPlatform/ops-agent/apps" "github.com/GoogleCloudPlatform/ops-agent/confgenerator" "github.com/GoogleCloudPlatform/ops-agent/internal/healthchecks" "github.com/GoogleCloudPlatform/ops-agent/internal/logs" "github.com/GoogleCloudPlatform/ops-agent/internal/self_metrics" "github.com/kardianos/osext" "golang.org/x/sys/windows" "golang.org/x/sys/windows/svc/debug" "golang.org/x/sys/windows/svc/eventlog" "golang.org/x/sys/windows/svc/mgr" "google.golang.org/grpc/status" pb "github.com/GoogleCloudPlatform/ops-agent/cmd/ops_agent_uap_plugin/google_guest_agent/plugin" ) const ( GeneratedConfigsOutDir = "generated_configs" LogsDirectory = "log" RuntimeDirectory = "run" OpsAgentUAPPluginEventID uint32 = 8 WindowsEventLogIdentifier = "google-cloud-ops-agent-uap-plugin" WindowJobHandleIdentifier = "google-cloud-ops-agent-uap-plugin-job-handle" AgentWrapperBinary = "google-cloud-ops-agent-wrapper.exe" FluentbitBinary = "fluent-bit.exe" OtelBinary = "google-cloud-metrics-agent_windows_amd64.exe" ) var ( AgentWindowsServiceName = []string{"StackdriverLogging", "StackdriverMonitoring", "google-cloud-ops-agent"} DefaultPluginStateDirectory = filepath.Join(os.Getenv("PROGRAMDATA"), "Google/Compute Engine/google-guest-agent/agent_state/plugins/ops-agent-plugin") OpsAgentConfigLocationWindows = filepath.Join("C:", "Program Files/Google/Cloud Operations/Ops Agent/config/config.yaml") ) // RunSubAgentCommandFunc defines a function type that starts a subagent. If one subagent execution exited, other sugagents are also terminated via context cancellation. This abstraction is introduced // primarily to facilitate testing by allowing the injection of mock // implementations. type RunSubAgentCommandFunc func(ctx context.Context, cancel context.CancelFunc, cmd *exec.Cmd, runCommand RunCommandFunc, wg *sync.WaitGroup) // Apply applies the config sent or performs the work defined in the message. // ApplyRequest is opaque to the agent and is expected to be well known contract // between Plugin and the server itself. For e.g. service might want to update // plugin config to enable/disable feature here plugins can react to such requests. func (ps *OpsAgentPluginServer) Apply(ctx context.Context, msg *pb.ApplyRequest) (*pb.ApplyResponse, error) { panic("Apply method is not implemented on Windows yet") } // Start starts the plugin and initiates the plugin functionality. // Until plugin receives Start request plugin is expected to be not functioning // and just listening on the address handed off waiting for the request. func (ps *OpsAgentPluginServer) Start(ctx context.Context, msg *pb.StartRequest) (*pb.StartResponse, error) { ps.mu.Lock() if ps.cancel != nil { log.Printf("The Ops Agent plugin is started already, skipping the current request") ps.mu.Unlock() return &pb.StartResponse{}, nil } log.Printf("Received a Start request: %s. Starting the Ops Agent", msg) pContext, cancel := context.WithCancel(context.Background()) ps.cancel = cancel ps.mu.Unlock() // Detect conflicting installations. preInstalledAgents, err := findPreExistentAgents(&windowsServiceManager{}, AgentWindowsServiceName) if len(preInstalledAgents) != 0 || err != nil { ps.Stop(ctx, &pb.StopRequest{Cleanup: false}) log.Printf("Start() failed: %s", err) return nil, status.Error(9, err.Error()) // FailedPrecondition } // Calculate plugin install and state dirs. pluginInstallDir, err := osext.ExecutableFolder() if err != nil { ps.Stop(ctx, &pb.StopRequest{Cleanup: false}) log.Printf("Start() failed, because it cannot determine the plugin install location: %s", err) return nil, status.Error(13, err.Error()) // Internal } pluginStateDir := msg.GetConfig().GetStateDirectoryPath() if pluginStateDir == "" { pluginStateDir = DefaultPluginStateDirectory } log.Printf("Determined pluginInstallDir: %v, and pluginStateDir: %v", pluginInstallDir, pluginStateDir) // Create a windows Event logger. This is used to log generated subagent configs, and health check results. windowsEventLogger, err := createWindowsEventLogger() if err != nil { ps.Stop(ctx, &pb.StopRequest{Cleanup: false}) log.Printf("Start() failed, because it failed to create Windows event logger: %s", err) return nil, status.Error(13, err.Error()) // Internal } // Receive config from the Start request and write it to the Ops Agent config file. if err := writeCustomConfigToFile(msg, OpsAgentConfigLocationWindows); err != nil { ps.Stop(ctx, &pb.StopRequest{Cleanup: false}) windowsEventLogger.Close() log.Printf("Start() failed, because it failed to write the custom Ops Agent config to file: %s", err) return nil, status.Errorf(13, "failed to write the custom Ops Agent config to file: %s", err) // Internal } // Subagents config validation and generation. if err := generateSubAgentConfigs(ctx, OpsAgentConfigLocationWindows, pluginStateDir, windowsEventLogger); err != nil { ps.Stop(ctx, &pb.StopRequest{Cleanup: false}) windowsEventLogger.Close() log.Printf("Start() failed at the subagent config validation and generation step: %s", err) return nil, status.Error(9, err.Error()) // FailedPrecondition } // Trigger Healthchecks. healthCheckFileLogger := healthchecks.CreateHealthChecksLogger(filepath.Join(pluginStateDir, LogsDirectory)) runHealthChecks(healthCheckFileLogger, windowsEventLogger) // Create a Windows Job object and stores its handle, to ensure that all child processes are killed when the parent process exits. _, err = createWindowsJobHandle() if err != nil { ps.Stop(ctx, &pb.StopRequest{Cleanup: false}) windowsEventLogger.Close() log.Printf("Start() failed, because it failed to create a Windows Job object: %s", err) return nil, status.Error(13, err.Error()) // Internal } cancelFunc := func() { ps.Stop(ctx, &pb.StopRequest{Cleanup: false}) windowsEventLogger.Close() } go runSubagents(pContext, cancelFunc, pluginInstallDir, pluginStateDir, runSubAgentCommand, ps.runCommand) return &pb.StartResponse{}, nil } // Stop is the stop hook and implements any cleanup if required. // Stop maybe called if plugin revision is being changed. // For e.g. if plugins want to stop some task it was performing or remove some // state before exiting it can be done on this request. func (ps *OpsAgentPluginServer) Stop(ctx context.Context, msg *pb.StopRequest) (*pb.StopResponse, error) { ps.mu.Lock() defer ps.mu.Unlock() if ps.cancel == nil { log.Printf("The Ops Agent plugin is stopped already, skipping the current request") return &pb.StopResponse{}, nil } log.Printf("Received a Stop request: %s. Stopping the Ops Agent", msg) ps.cancel() ps.cancel = nil return &pb.StopResponse{}, nil } // GetStatus is the health check agent would perform to make sure plugin process // is alive. If request fails process is considered dead and relaunched. Plugins // can share any additional information to report it to the service. For e.g. if // plugins detect some non-fatal errors causing it unable to offer some features // it can reported in status which is sent back to the service by agent. func (ps *OpsAgentPluginServer) GetStatus(ctx context.Context, msg *pb.GetStatusRequest) (*pb.Status, error) { log.Println("Received a GetStatus request") ps.mu.Lock() defer ps.mu.Unlock() if ps.cancel == nil { log.Println("The Ops Agent plugin is not running") return &pb.Status{Code: 1, Results: []string{"The Ops Agent Plugin is not running."}}, nil } log.Println("The Ops Agent plugin is running") return &pb.Status{Code: 0, Results: []string{"The Ops Agent Plugin is running ok."}}, nil } // serviceManager is an interface to abstract the Windows service manager. This is used to facilitate testing. type serviceManager interface { Connect() (serviceManagerConnection, error) } // serviceManagerConnection is an interface to abstract the connection to the Windows service manager. This is used to facilitate testing. type serviceManagerConnection interface { ListServices() ([]string, error) Disconnect() error } type windowsServiceManager struct{} type windowsServiceManagerConn struct { mgr *mgr.Mgr } func (w *windowsServiceManager) Connect() (serviceManagerConnection, error) { m, err := mgr.Connect() if err != nil { return nil, err } return &windowsServiceManagerConn{mgr: m}, nil } func (c *windowsServiceManagerConn) ListServices() ([]string, error) { return c.mgr.ListServices() } func (c *windowsServiceManagerConn) Disconnect() error { return c.mgr.Disconnect() } // findPreExistentAgents checks if any of the Ops Agent and legacy agents are already installed as Window Services. func findPreExistentAgents(mgr serviceManager, agentWindowsServiceNames []string) ([]string, error) { conn, err := mgr.Connect() if err != nil { return nil, fmt.Errorf("failed to connect to service manager: %s", err) } defer conn.Disconnect() installedServices, err := conn.ListServices() if err != nil { return nil, fmt.Errorf("failed to list installed Windows services: %s", err) } installedServicesSet := make(map[string]bool) for _, s := range installedServices { installedServicesSet[s] = true } alreadyInstalledAgentServiceNames := []string{} for _, s := range agentWindowsServiceNames { if installedServicesSet[s] { alreadyInstalledAgentServiceNames = append(alreadyInstalledAgentServiceNames, s) } } if len(alreadyInstalledAgentServiceNames) != 0 { return alreadyInstalledAgentServiceNames, fmt.Errorf("conflicting installations identified: %v", alreadyInstalledAgentServiceNames) } return alreadyInstalledAgentServiceNames, nil } // eventLogWriter implements the io.Writer interface. It writes logs to the Windows Event Log. type eventLogWriter struct { EventID uint32 EventLog *eventlog.Log } func (w *eventLogWriter) Write(p []byte) (int, error) { err := w.EventLog.Info(w.EventID, string(p)) if err != nil { return 0, err } return len(p), nil } func createWindowsEventLogger() (debug.Log, error) { eventlog.InstallAsEventCreate(WindowsEventLogIdentifier, eventlog.Error|eventlog.Warning|eventlog.Info) elog, err := eventlog.Open(WindowsEventLogIdentifier) if err != nil { return nil, err } // ConfGenerator might log messages to stdout, redirect them to the windows event log. log.SetOutput(&eventLogWriter{ EventID: OpsAgentUAPPluginEventID, EventLog: elog, }) return elog, nil } func generateSubAgentConfigs(ctx context.Context, userConfigPath string, pluginStateDir string, windowsEventLogger debug.Log) error { uc, err := confgenerator.MergeConfFiles(ctx, userConfigPath, apps.BuiltInConfStructs) if err != nil { return err } windowsEventLogger.Info(OpsAgentUAPPluginEventID, fmt.Sprintf("Built-in config:\n%s\n", apps.BuiltInConfStructs["windows"])) windowsEventLogger.Info(OpsAgentUAPPluginEventID, fmt.Sprintf("Merged config:\n%s\n", uc)) // The generated otlp metric json files are used only by the otel service. if err = self_metrics.GenerateOpsAgentSelfMetricsOTLPJSON(ctx, userConfigPath, filepath.Join(pluginStateDir, GeneratedConfigsOutDir, "otel")); err != nil { return err } for _, subagent := range []string{ "otel", "fluentbit", } { if err := uc.GenerateFilesFromConfig( ctx, subagent, filepath.Join(pluginStateDir, LogsDirectory), filepath.Join(pluginStateDir, RuntimeDirectory), filepath.Join(pluginStateDir, GeneratedConfigsOutDir, subagent)); err != nil { return err } } return nil } func runHealthChecks(healthCheckFileLogger logs.StructuredLogger, windowsEventLogger debug.Log) { gceHealthChecks := healthchecks.HealthCheckRegistryFactory() // Log health check results to health-checks.log log file. healthCheckResults := gceHealthChecks.RunAllHealthChecks(healthCheckFileLogger) // Log health check results to windows event log too. healthCheckWindowsEventLogger := logs.WindowsServiceLogger{EventID: OpsAgentUAPPluginEventID, Logger: windowsEventLogger} healthchecks.LogHealthCheckResults(healthCheckResults, healthCheckWindowsEventLogger) windowsEventLogger.Info(OpsAgentUAPPluginEventID, "Health checks completed") } func createWindowsJobHandle() (windows.Handle, error) { jobHandle, err := windows.CreateJobObject(nil, nil) if err != nil { return 0, err } info := windows.JOBOBJECT_EXTENDED_LIMIT_INFORMATION{ BasicLimitInformation: windows.JOBOBJECT_BASIC_LIMIT_INFORMATION{ LimitFlags: windows.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, }, } _, err = windows.SetInformationJobObject( jobHandle, windows.JobObjectExtendedLimitInformation, uintptr(unsafe.Pointer(&info)), uint32(unsafe.Sizeof(info))) if err != nil { windows.CloseHandle(jobHandle) return 0, err } // Assign the current process to the job object. This ensures that all child processes are automatically assigned to the same Job object. err = windows.AssignProcessToJobObject(jobHandle, windows.CurrentProcess()) if err != nil { windows.CloseHandle(jobHandle) return 0, err } return jobHandle, nil } // runSubagents starts up otel and fluent bit subagents in separate goroutines. // All child goroutines create a new context derived from the same parent context. // This ensures that crashes in one goroutine don't affect other goroutines. // However, when one goroutine exits with errors, it won't be restarted, and all other goroutines are also terminated. // This is done by canceling the parent context. // This makes sure that GetStatus() returns a non-healthy status, signaling UAP to Start() the plugin again. // // ctx: the parent context that all child goroutines share. // // cancel: the cancel function for the parent context. By calling this function, the parent context is canceled, // and GetStatus() returns a non-healthy status, signaling UAP to re-trigger Start(). func runSubagents(ctx context.Context, cancel context.CancelFunc, pluginInstallDirectory string, pluginStateDirectory string, runSubAgentCommand RunSubAgentCommandFunc, runCommand RunCommandFunc) { var wg sync.WaitGroup // Starting Otel runOtelCmd := exec.CommandContext(ctx, path.Join(pluginInstallDirectory, OtelBinary), "--config", path.Join(pluginStateDirectory, GeneratedConfigsOutDir, "otel/otel.yaml"), ) wg.Add(1) go runSubAgentCommand(ctx, cancel, runOtelCmd, runCommand, &wg) // Starting Fluentbit runFluentBitCmd := exec.CommandContext(ctx, path.Join(pluginInstallDirectory, AgentWrapperBinary), "-config_path", OpsAgentConfigLocationWindows, "-log_path", path.Join(pluginStateDirectory, LogsDirectory, "logging-module.log"), path.Join(pluginInstallDirectory, FluentbitBinary), "-c", path.Join(pluginStateDirectory, GeneratedConfigsOutDir, "fluentbit/fluent_bit_main.conf"), "-R", path.Join(pluginStateDirectory, GeneratedConfigsOutDir, "fluentbit/fluent_bit_parser.conf"), "--storage_path", path.Join(pluginStateDirectory, "run/buffers"), ) wg.Add(1) go runSubAgentCommand(ctx, cancel, runFluentBitCmd, runCommand, &wg) wg.Wait() } func runCommand(cmd *exec.Cmd) (string, error) { if cmd == nil { return "", nil } log.Printf("Running command: %s", cmd.Args) out, err := cmd.CombinedOutput() if err != nil { log.Printf("Command %s failed, \ncommand output: %s\ncommand error: %s", cmd.Args, string(out), err) } return string(out), err } func runSubAgentCommand(ctx context.Context, cancel context.CancelFunc, cmd *exec.Cmd, runCommand RunCommandFunc, wg *sync.WaitGroup) { defer wg.Done() if cmd == nil { return } if ctx.Err() != nil { // context has been cancelled log.Printf("cannot execute command: %s, because the context has been cancelled", cmd.Args) return } output, err := runCommand(cmd) if err != nil { log.Printf("command: %s exited with errors, not restarting.\nCommand output: %s\n Command error:%s", cmd.Args, string(output), err) } else { log.Printf("command: %s %s exited successfully.\nCommand output: %s", cmd.Path, cmd.Args, string(output)) } cancel() // cancels the parent context which also stops other Ops Agent sub-binaries from running. return }