in go/protocol/command_executor.go [89:168]
func NewCommandExecutor[Req, Res any](
app *Application,
client MqttClient,
requestEncoding Encoding[Req],
responseEncoding Encoding[Res],
requestTopicPattern string,
handler CommandHandler[Req, Res],
opt ...CommandExecutorOption,
) (ce *CommandExecutor[Req, Res], err error) {
var opts CommandExecutorOptions
opts.Apply(opt)
logger := log.Wrap(opts.Logger, app.log)
defer func() { err = errutil.Return(err, logger, true) }()
if err := errutil.ValidateNonNil(map[string]any{
"client": client,
"requestEncoding": requestEncoding,
"responseEncoding": responseEncoding,
"handler": handler,
}); err != nil {
return nil, err
}
to := &internal.Timeout{
Duration: opts.Timeout,
Name: "ExecutionTimeout",
Text: commandExecutorErrStr,
}
if err := to.Validate(); err != nil {
return nil, err
}
if err := internal.ValidateShareName(opts.ShareName); err != nil {
return nil, err
}
reqTP, err := internal.NewTopicPattern(
"requestTopicPattern",
requestTopicPattern,
opts.TopicTokens,
opts.TopicNamespace,
)
if err != nil {
return nil, err
}
reqTF, err := reqTP.Filter()
if err != nil {
return nil, err
}
ce = &CommandExecutor[Req, Res]{
handler: handler,
timeout: to,
cache: caching.New(wallclock.Instance),
log: logger,
}
ce.listener = &listener[Req]{
app: app,
client: client,
encoding: requestEncoding,
topic: reqTF,
shareName: opts.ShareName,
concurrency: opts.Concurrency,
reqCorrelation: true,
supportedVersion: version.RPCSupported,
log: logger,
handler: ce,
}
ce.publisher = &publisher[Res]{
app: app,
client: client,
encoding: responseEncoding,
version: version.RPC,
}
ce.listener.register()
return ce, nil
}