in go/protocol/command_invoker.go [92:200]
func NewCommandInvoker[Req, Res any](
app *Application,
client MqttClient,
requestEncoding Encoding[Req],
responseEncoding Encoding[Res],
requestTopicPattern string,
opt ...CommandInvokerOption,
) (ci *CommandInvoker[Req, Res], err error) {
var opts CommandInvokerOptions
opts.Apply(opt)
logger := log.Wrap(opts.Logger, app.log)
defer func() { err = errutil.Return(err, logger, true) }()
if err := errutil.ValidateNonNil(map[string]any{
"client": client,
"requestEncoding": requestEncoding,
"responseEncoding": responseEncoding,
}); err != nil {
return nil, err
}
responseTopicPattern := opts.ResponseTopicPattern
if responseTopicPattern == "" {
responseTopicPattern = requestTopicPattern
if opts.ResponseTopicPrefix != "" {
err = internal.ValidateTopicPatternComponent(
"responseTopicPrefix",
"invalid response topic prefix",
opts.ResponseTopicPrefix,
)
if err != nil {
return nil, err
}
responseTopicPattern = opts.ResponseTopicPrefix + "/" + responseTopicPattern
}
if opts.ResponseTopicSuffix != "" {
err = internal.ValidateTopicPatternComponent(
"responseTopicSuffix",
"invalid response topic suffix",
opts.ResponseTopicSuffix,
)
if err != nil {
return nil, err
}
responseTopicPattern = responseTopicPattern + "/" + opts.ResponseTopicSuffix
}
// If no options were provided, apply a well-known prefix. This ensures
// that the response topic is different from the request topic and lets
// us document this pattern for auth configuration. Note that this does
// not use any topic tokens, since we cannot guarantee their existence.
if opts.ResponseTopicPrefix == "" && opts.ResponseTopicSuffix == "" {
responseTopicPattern = "clients/" + client.ID() + "/" + requestTopicPattern
}
}
reqTP, err := internal.NewTopicPattern(
"requestTopicPattern",
requestTopicPattern,
opts.TopicTokens,
opts.TopicNamespace,
)
if err != nil {
return nil, err
}
resTP, err := internal.NewTopicPattern(
"responseTopicPattern",
responseTopicPattern,
opts.TopicTokens,
opts.TopicNamespace,
)
if err != nil {
return nil, err
}
resTF, err := resTP.Filter()
if err != nil {
return nil, err
}
ci = &CommandInvoker[Req, Res]{
responseTopic: resTP,
log: logger,
pending: container.NewSyncMap[string, commandPending[Res]](),
}
ci.publisher = &publisher[Req]{
app: app,
client: client,
encoding: requestEncoding,
topic: reqTP,
version: version.RPC,
}
ci.listener = &listener[Res]{
app: app,
client: client,
encoding: responseEncoding,
topic: resTF,
reqCorrelation: true,
supportedVersion: version.RPCSupported,
log: logger,
handler: ci,
}
ci.listener.register()
return ci, nil
}