func()

in go/protocol/command_executor.go [180:250]


func (ce *CommandExecutor[Req, Res]) onMsg(
	ctx context.Context,
	pub *mqtt.Message,
	msg *Message[Req],
) error {
	ce.log.Debug(ctx, "request received",
		slog.String("topic", pub.Topic),
		slog.Any("correlation_data", pub.CorrelationData),
	)

	if err := ignoreRequest(pub); err != nil {
		return err
	}

	if pub.MessageExpiry == 0 {
		return &errors.Remote{
			Message: "message expiry missing",
			Kind: errors.HeaderMissing{
				HeaderName: constants.MessageExpiry,
			},
		}
	}

	rpub, err := ce.cache.Exec(pub, func() (*mqtt.Message, error) {
		req := &CommandRequest[Req]{Message: *msg}
		var err error

		req.Payload, err = ce.listener.payload(msg)
		if err != nil {
			return nil, err
		}

		handlerCtx, cancel := ce.timeout.Context(ctx)
		defer cancel()

		handlerCtx, cancel = pubTimeout(pub).Context(handlerCtx)
		defer cancel()

		res, err := ce.handle(handlerCtx, req)
		if err != nil {
			return nil, err
		}

		rpub, err := ce.build(pub, res, nil)
		if err != nil {
			return nil, err
		}

		return rpub, nil
	})
	if err != nil {
		return err
	}

	defer ce.ack(ctx, pub)

	if rpub == nil {
		return nil
	}

	if err = ce.publisher.publish(ctx, rpub); err != nil {
		// If the publish fails onErr will also fail, so just drop the message.
		ce.listener.drop(ctx, pub, err)
	} else {
		ce.log.Debug(ctx, "response sent",
			slog.String("topic", rpub.Topic),
			slog.Any("correlation_data", rpub.CorrelationData),
		)
	}
	return nil
}