func NewCommandInvoker[Req, Res any]()

in go/protocol/command_invoker.go [92:200]


func NewCommandInvoker[Req, Res any](
	app *Application,
	client MqttClient,
	requestEncoding Encoding[Req],
	responseEncoding Encoding[Res],
	requestTopicPattern string,
	opt ...CommandInvokerOption,
) (ci *CommandInvoker[Req, Res], err error) {
	var opts CommandInvokerOptions
	opts.Apply(opt)

	logger := log.Wrap(opts.Logger, app.log)
	defer func() { err = errutil.Return(err, logger, true) }()

	if err := errutil.ValidateNonNil(map[string]any{
		"client":           client,
		"requestEncoding":  requestEncoding,
		"responseEncoding": responseEncoding,
	}); err != nil {
		return nil, err
	}

	responseTopicPattern := opts.ResponseTopicPattern
	if responseTopicPattern == "" {
		responseTopicPattern = requestTopicPattern

		if opts.ResponseTopicPrefix != "" {
			err = internal.ValidateTopicPatternComponent(
				"responseTopicPrefix",
				"invalid response topic prefix",
				opts.ResponseTopicPrefix,
			)
			if err != nil {
				return nil, err
			}
			responseTopicPattern = opts.ResponseTopicPrefix + "/" + responseTopicPattern
		}
		if opts.ResponseTopicSuffix != "" {
			err = internal.ValidateTopicPatternComponent(
				"responseTopicSuffix",
				"invalid response topic suffix",
				opts.ResponseTopicSuffix,
			)
			if err != nil {
				return nil, err
			}
			responseTopicPattern = responseTopicPattern + "/" + opts.ResponseTopicSuffix
		}

		// If no options were provided, apply a well-known prefix. This ensures
		// that the response topic is different from the request topic and lets
		// us document this pattern for auth configuration. Note that this does
		// not use any topic tokens, since we cannot guarantee their existence.
		if opts.ResponseTopicPrefix == "" && opts.ResponseTopicSuffix == "" {
			responseTopicPattern = "clients/" + client.ID() + "/" + requestTopicPattern
		}
	}

	reqTP, err := internal.NewTopicPattern(
		"requestTopicPattern",
		requestTopicPattern,
		opts.TopicTokens,
		opts.TopicNamespace,
	)
	if err != nil {
		return nil, err
	}

	resTP, err := internal.NewTopicPattern(
		"responseTopicPattern",
		responseTopicPattern,
		opts.TopicTokens,
		opts.TopicNamespace,
	)
	if err != nil {
		return nil, err
	}

	resTF, err := resTP.Filter()
	if err != nil {
		return nil, err
	}

	ci = &CommandInvoker[Req, Res]{
		responseTopic: resTP,
		log:           logger,
		pending:       container.NewSyncMap[string, commandPending[Res]](),
	}
	ci.publisher = &publisher[Req]{
		app:      app,
		client:   client,
		encoding: requestEncoding,
		topic:    reqTP,
		version:  version.RPC,
	}
	ci.listener = &listener[Res]{
		app:              app,
		client:           client,
		encoding:         responseEncoding,
		topic:            resTF,
		reqCorrelation:   true,
		supportedVersion: version.RPCSupported,
		log:              logger,
		handler:          ci,
	}

	ci.listener.register()
	return ci, nil
}