in go/protocol/command_executor.go [180:250]
func (ce *CommandExecutor[Req, Res]) onMsg(
ctx context.Context,
pub *mqtt.Message,
msg *Message[Req],
) error {
ce.log.Debug(ctx, "request received",
slog.String("topic", pub.Topic),
slog.Any("correlation_data", pub.CorrelationData),
)
if err := ignoreRequest(pub); err != nil {
return err
}
if pub.MessageExpiry == 0 {
return &errors.Remote{
Message: "message expiry missing",
Kind: errors.HeaderMissing{
HeaderName: constants.MessageExpiry,
},
}
}
rpub, err := ce.cache.Exec(pub, func() (*mqtt.Message, error) {
req := &CommandRequest[Req]{Message: *msg}
var err error
req.Payload, err = ce.listener.payload(msg)
if err != nil {
return nil, err
}
handlerCtx, cancel := ce.timeout.Context(ctx)
defer cancel()
handlerCtx, cancel = pubTimeout(pub).Context(handlerCtx)
defer cancel()
res, err := ce.handle(handlerCtx, req)
if err != nil {
return nil, err
}
rpub, err := ce.build(pub, res, nil)
if err != nil {
return nil, err
}
return rpub, nil
})
if err != nil {
return err
}
defer ce.ack(ctx, pub)
if rpub == nil {
return nil
}
if err = ce.publisher.publish(ctx, rpub); err != nil {
// If the publish fails onErr will also fail, so just drop the message.
ce.listener.drop(ctx, pub, err)
} else {
ce.log.Debug(ctx, "response sent",
slog.String("topic", rpub.Topic),
slog.Any("correlation_data", rpub.CorrelationData),
)
}
return nil
}