in internal/acs/handler/handler.go [85:140]
func (f *dataFetchers) handleMessage(ctx context.Context, eventType string, data any, event *events.EventData) bool {
if event.Error != nil {
galog.Warnf("ACS event watcher failed, ignoring: %v", event.Error)
return true
}
var resp proto.Message
msg, ok := event.Data.(*acpb.MessageBody)
if !ok {
galog.Warnf("ACS watcher sent invalid data of type %T, ignoring", event.Data)
return true
}
f.worker.Add(1)
// Running separate go routine to handle the request prevents from blocking
// the new watcher events.
// Go routine handles the request and exits. In case of configure request it
// waits until all the plugin specified in the request are and processed.
go func() {
defer f.worker.Done()
messageType := msg.Labels[messageTypeLabel]
reqID := msg.Labels["request_id"]
labels := msg.Labels
galog.Debugf("Handling %q (request id: %q) request", messageType, reqID)
switch messageType {
case getAgentInfoMsg:
resp = f.agentInfo()
labels[messageTypeLabel] = agentInfoMsg
case getOSInfoMsg:
resp = f.osInfo()
labels[messageTypeLabel] = osInfoMsg
case listPluginStatesMsg:
resp = f.listPluginStates(ctx, msg)
labels[messageTypeLabel] = currentPluginStatesMsg
case configurePluginStatesMsg:
f.configurePluginStates(ctx, msg)
default:
galog.Warnf("Unknown message type: %q, ignoring", messageType)
}
// Response will be [nil] in case of [configurePluginStatesMsg] request as
// its handled asynchronously and notified about the events later.
if resp != nil {
if err := client.Send(ctx, labels, resp); err != nil {
galog.Warnf("Failed to send message: %v", err)
}
}
galog.Debugf("Successfully completed %q (request id: %q) request", messageType, reqID)
}()
return true
}