func NewTelemetryReceiver[T any]()

in go/protocol/telemetry_receiver.go [76:146]


func NewTelemetryReceiver[T any](
	app *Application,
	client MqttClient,
	encoding Encoding[T],
	topicPattern string,
	handler TelemetryHandler[T],
	opt ...TelemetryReceiverOption,
) (tr *TelemetryReceiver[T], err error) {
	var opts TelemetryReceiverOptions
	opts.Apply(opt)

	logger := log.Wrap(opts.Logger, app.log)
	defer func() { err = errutil.Return(err, logger, true) }()

	if err := errutil.ValidateNonNil(map[string]any{
		"client":   client,
		"encoding": encoding,
		"handler":  handler,
	}); err != nil {
		return nil, err
	}

	to := &internal.Timeout{
		Duration: opts.Timeout,
		Name:     "ExecutionTimeout",
		Text:     telemetryReceiverErrStr,
	}
	if err := to.Validate(); err != nil {
		return nil, err
	}

	if err := internal.ValidateShareName(opts.ShareName); err != nil {
		return nil, err
	}

	tp, err := internal.NewTopicPattern(
		"topicPattern",
		topicPattern,
		opts.TopicTokens,
		opts.TopicNamespace,
	)
	if err != nil {
		return nil, err
	}

	tf, err := tp.Filter()
	if err != nil {
		return nil, err
	}

	tr = &TelemetryReceiver[T]{
		handler:   handler,
		manualAck: opts.ManualAck,
		timeout:   to,
		log:       logger,
	}
	tr.listener = &listener[T]{
		app:              app,
		client:           client,
		encoding:         encoding,
		topic:            tf,
		shareName:        opts.ShareName,
		concurrency:      opts.Concurrency,
		supportedVersion: version.TelemetrySupported,
		log:              logger,
		handler:          tr,
	}

	tr.listener.register()
	return tr, nil
}