in go/protocol/command_invoker.go [216:291]
func (ci *CommandInvoker[Req, Res]) Invoke(
ctx context.Context,
req Req,
opt ...InvokeOption,
) (res *CommandResponse[Res], err error) {
var opts InvokeOptions
opts.Apply(opt)
shallow := true
defer func() { err = errutil.Return(err, ci.log, shallow) }()
timeout := opts.Timeout
if timeout == 0 {
timeout = DefaultTimeout
}
expiry := &internal.Timeout{
Duration: timeout,
Name: "MessageExpiry",
Text: commandInvokerErrStr,
}
if err := expiry.Validate(); err != nil {
return nil, err
}
correlationData, err := errutil.NewUUID()
if err != nil {
return nil, err
}
msg := &Message[Req]{
CorrelationData: correlationData,
Payload: req,
Metadata: opts.Metadata,
}
pub, err := ci.publisher.build(msg, opts.TopicTokens, expiry)
if err != nil {
return nil, err
}
pub.UserProperties[constants.Partition] = ci.publisher.client.ID()
pub.ResponseTopic, err = ci.responseTopic.Topic(opts.TopicTokens)
if err != nil {
return nil, err
}
listen, done := ci.initPending(string(pub.CorrelationData))
defer done()
shallow = false
err = ci.publisher.publish(ctx, pub)
if err != nil {
return nil, err
}
ci.log.Debug(ctx, "request sent",
slog.String("topic", pub.Topic),
slog.Any("correlation_data", pub.CorrelationData),
)
// If a message expiry was specified, also time out our own context, so that
// we stop listening for a response when none will come.
ctx, cancel := expiry.Context(ctx)
defer cancel()
select {
case res := <-listen:
ci.log.Debug(ctx, "response received",
slog.String("topic", pub.ResponseTopic),
slog.Any("correlation_data", pub.CorrelationData),
)
return res.res, res.err
case <-ctx.Done():
return nil, errutil.Context(ctx, commandInvokerErrStr)
}
}