func NewCommandExecutor[Req, Res any]()

in go/protocol/command_executor.go [89:168]


func NewCommandExecutor[Req, Res any](
	app *Application,
	client MqttClient,
	requestEncoding Encoding[Req],
	responseEncoding Encoding[Res],
	requestTopicPattern string,
	handler CommandHandler[Req, Res],
	opt ...CommandExecutorOption,
) (ce *CommandExecutor[Req, Res], err error) {
	var opts CommandExecutorOptions
	opts.Apply(opt)

	logger := log.Wrap(opts.Logger, app.log)
	defer func() { err = errutil.Return(err, logger, true) }()

	if err := errutil.ValidateNonNil(map[string]any{
		"client":           client,
		"requestEncoding":  requestEncoding,
		"responseEncoding": responseEncoding,
		"handler":          handler,
	}); err != nil {
		return nil, err
	}

	to := &internal.Timeout{
		Duration: opts.Timeout,
		Name:     "ExecutionTimeout",
		Text:     commandExecutorErrStr,
	}
	if err := to.Validate(); err != nil {
		return nil, err
	}

	if err := internal.ValidateShareName(opts.ShareName); err != nil {
		return nil, err
	}

	reqTP, err := internal.NewTopicPattern(
		"requestTopicPattern",
		requestTopicPattern,
		opts.TopicTokens,
		opts.TopicNamespace,
	)
	if err != nil {
		return nil, err
	}

	reqTF, err := reqTP.Filter()
	if err != nil {
		return nil, err
	}

	ce = &CommandExecutor[Req, Res]{
		handler: handler,
		timeout: to,
		cache:   caching.New(wallclock.Instance),
		log:     logger,
	}
	ce.listener = &listener[Req]{
		app:              app,
		client:           client,
		encoding:         requestEncoding,
		topic:            reqTF,
		shareName:        opts.ShareName,
		concurrency:      opts.Concurrency,
		reqCorrelation:   true,
		supportedVersion: version.RPCSupported,
		log:              logger,
		handler:          ce,
	}
	ce.publisher = &publisher[Res]{
		app:      app,
		client:   client,
		encoding: responseEncoding,
		version:  version.RPC,
	}

	ce.listener.register()
	return ce, nil
}