internal/acs/handler/handler.go (134 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package handler contains the implementation of handling ACS requests.
package handler
import (
"context"
"runtime"
"sync"
acpb "github.com/GoogleCloudPlatform/agentcommunication_client/gapic/agentcommunicationpb"
"github.com/GoogleCloudPlatform/galog"
acppb "github.com/GoogleCloudPlatform/google-guest-agent/internal/acp/proto/google_guest_agent/acp"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/acs/client"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/acs/watcher"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/cfg"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/events"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/osinfo"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/plugin/manager"
"google.golang.org/protobuf/proto"
anypb "google.golang.org/protobuf/types/known/anypb"
)
const (
// subscriberID is the subscriber ID for ACS message handler.
subscriberID = "ACS-message-handler"
// messageTypeLabel is key in labels for message type.
messageTypeLabel = "message_type"
// Following are the known message types Guest Agent will support.
// getAgentInfoMsg is the message type label for GetAgentInfo request.
getAgentInfoMsg = "agent_controlplane.GetAgentInfo"
// agentInfoMsg is the message type label for AgentInfo response.
agentInfoMsg = "agent_controlplane.AgentInfo"
// getOSInfoMsg is the message type label for GetOSInfo request.
getOSInfoMsg = "agent_controlplane.GetOSInfo"
// osInfoMsg is the message type label for OSInfo response.
osInfoMsg = "agent_controlplane.OSInfo"
// listPluginStatesMsg is the message type label for ListPluginStates request.
listPluginStatesMsg = "agent_controlplane.ListPluginStates"
// currentPluginStatesMsg is the message type label for CurrentPluginStates response.
currentPluginStatesMsg = "agent_controlplane.CurrentPluginStates"
// configurePluginStatesMsg is the message type label for ConfigurePluginStates request.
configurePluginStatesMsg = "agent_controlplane.ConfigurePluginStates"
)
// Init initializes the ACS handler that starts listening to ACS messages.
func Init(ver string) {
if !cfg.Retrieve().Core.ACSClient {
galog.V(2).Debugf("ACS client is disabled, skipping handler registration")
return
}
d := &dataFetchers{clientVersion: ver, osInfoReader: osinfo.Read, pluginManager: manager.Instance()}
sub := events.EventSubscriber{Name: subscriberID, Data: nil, Callback: d.handleMessage}
events.FetchManager().Subscribe(watcher.MessageReceiver, sub)
}
// dataFetchers wraps the fetchers required for collecting information and
// supporting requests from ACS, this helps having test doubles in unit tests.
// Actual fetchers make system calls in some case which must be avoided.
type dataFetchers struct {
// worker is used to wait for the requests to complete. This is used only in
// unit tests.
worker sync.WaitGroup
// clientVersion is the version of the running guest agent.
clientVersion string
osInfoReader func() osinfo.OSInfo
pluginManager *manager.PluginManager
}
// handleMessage handles request from ACS channel and always returns true to continue
// listening to watcher events.
func (f *dataFetchers) handleMessage(ctx context.Context, eventType string, data any, event *events.EventData) bool {
if event.Error != nil {
galog.Warnf("ACS event watcher failed, ignoring: %v", event.Error)
return true
}
var resp proto.Message
msg, ok := event.Data.(*acpb.MessageBody)
if !ok {
galog.Warnf("ACS watcher sent invalid data of type %T, ignoring", event.Data)
return true
}
f.worker.Add(1)
// Running separate go routine to handle the request prevents from blocking
// the new watcher events.
// Go routine handles the request and exits. In case of configure request it
// waits until all the plugin specified in the request are and processed.
go func() {
defer f.worker.Done()
messageType := msg.Labels[messageTypeLabel]
reqID := msg.Labels["request_id"]
labels := msg.Labels
galog.Debugf("Handling %q (request id: %q) request", messageType, reqID)
switch messageType {
case getAgentInfoMsg:
resp = f.agentInfo()
labels[messageTypeLabel] = agentInfoMsg
case getOSInfoMsg:
resp = f.osInfo()
labels[messageTypeLabel] = osInfoMsg
case listPluginStatesMsg:
resp = f.listPluginStates(ctx, msg)
labels[messageTypeLabel] = currentPluginStatesMsg
case configurePluginStatesMsg:
f.configurePluginStates(ctx, msg)
default:
galog.Warnf("Unknown message type: %q, ignoring", messageType)
}
// Response will be [nil] in case of [configurePluginStatesMsg] request as
// its handled asynchronously and notified about the events later.
if resp != nil {
if err := client.Send(ctx, labels, resp); err != nil {
galog.Warnf("Failed to send message: %v", err)
}
}
galog.Debugf("Successfully completed %q (request id: %q) request", messageType, reqID)
}()
return true
}
// agentInfo returns the AgentInfo proto message.
func (f *dataFetchers) agentInfo() *acppb.AgentInfo {
return &acppb.AgentInfo{
Name: "GCEGuestAgent",
Architecture: runtime.GOARCH,
AgentCapabilities: []acppb.AgentInfo_AgentCapability{
acppb.AgentInfo_GET_AGENT_INFO,
acppb.AgentInfo_GET_OS_INFO,
acppb.AgentInfo_LIST_PLUGIN_STATES,
acppb.AgentInfo_CONFIGURE_PLUGIN_STATES,
},
Version: f.clientVersion,
}
}
// osInfo reads the OSInfo and returns the OSInfo proto message.
func (f *dataFetchers) osInfo() *acppb.OSInfo {
info := f.osInfoReader()
return &acppb.OSInfo{
Architecture: info.Architecture,
Type: runtime.GOOS,
Version: info.VersionID,
ShortName: info.OS,
LongName: info.PrettyName,
KernelRelease: info.KernelRelease,
KernelVersion: info.KernelVersion,
}
}
func (f *dataFetchers) configurePluginStates(ctx context.Context, msg *acpb.MessageBody) {
req := new(acppb.ConfigurePluginStates)
if err := anypb.UnmarshalTo(msg.Body, req, proto.UnmarshalOptions{}); err != nil {
galog.Warnf("Failed to unmarshal ConfigurePluginStates request: %v", err)
return
}
// Don't process the request from ACS if the plugin manager is not initialized
// yet. This is to avoid trying to configure the plugins that may already
// exist on disk.
if !f.pluginManager.IsInitialized.Load() {
galog.Warnf("Plugin manager is not initialized yet, ignoring ConfigurePluginStates request")
return
}
f.pluginManager.ConfigurePluginStates(ctx, req, false)
}
func (f *dataFetchers) listPluginStates(ctx context.Context, msg *acpb.MessageBody) *acppb.CurrentPluginStates {
// Don't process the request from ACS if the plugin manager is not initialized
// yet. This is to avoid trying to list the plugins that may already
// exist on disk.
if !f.pluginManager.IsInitialized.Load() {
galog.Warnf("Plugin manager is not initialized yet, ignoring ListPluginStates request")
return nil
}
req := new(acppb.ListPluginStates)
if err := anypb.UnmarshalTo(msg.Body, req, proto.UnmarshalOptions{}); err != nil {
galog.Warnf("Failed to unmarshal ListPluginStates request: %v", err)
return nil
}
return f.pluginManager.ListPluginStates(ctx, req)
}