func()

in internal/acs/handler/handler.go [85:140]


func (f *dataFetchers) handleMessage(ctx context.Context, eventType string, data any, event *events.EventData) bool {
	if event.Error != nil {
		galog.Warnf("ACS event watcher failed, ignoring: %v", event.Error)
		return true
	}

	var resp proto.Message
	msg, ok := event.Data.(*acpb.MessageBody)
	if !ok {
		galog.Warnf("ACS watcher sent invalid data of type %T, ignoring", event.Data)
		return true
	}

	f.worker.Add(1)
	// Running separate go routine to handle the request prevents from blocking
	// the new watcher events.
	// Go routine handles the request and exits. In case of configure request it
	// waits until all the plugin specified in the request are and processed.
	go func() {
		defer f.worker.Done()

		messageType := msg.Labels[messageTypeLabel]
		reqID := msg.Labels["request_id"]
		labels := msg.Labels

		galog.Debugf("Handling %q (request id: %q) request", messageType, reqID)

		switch messageType {
		case getAgentInfoMsg:
			resp = f.agentInfo()
			labels[messageTypeLabel] = agentInfoMsg
		case getOSInfoMsg:
			resp = f.osInfo()
			labels[messageTypeLabel] = osInfoMsg
		case listPluginStatesMsg:
			resp = f.listPluginStates(ctx, msg)
			labels[messageTypeLabel] = currentPluginStatesMsg
		case configurePluginStatesMsg:
			f.configurePluginStates(ctx, msg)
		default:
			galog.Warnf("Unknown message type: %q, ignoring", messageType)
		}

		// Response will be [nil] in case of [configurePluginStatesMsg] request as
		// its handled asynchronously and notified about the events later.
		if resp != nil {
			if err := client.Send(ctx, labels, resp); err != nil {
				galog.Warnf("Failed to send message: %v", err)
			}
		}

		galog.Debugf("Successfully completed %q (request id: %q) request", messageType, reqID)
	}()

	return true
}