in go/protocol/telemetry_receiver.go [76:146]
func NewTelemetryReceiver[T any](
app *Application,
client MqttClient,
encoding Encoding[T],
topicPattern string,
handler TelemetryHandler[T],
opt ...TelemetryReceiverOption,
) (tr *TelemetryReceiver[T], err error) {
var opts TelemetryReceiverOptions
opts.Apply(opt)
logger := log.Wrap(opts.Logger, app.log)
defer func() { err = errutil.Return(err, logger, true) }()
if err := errutil.ValidateNonNil(map[string]any{
"client": client,
"encoding": encoding,
"handler": handler,
}); err != nil {
return nil, err
}
to := &internal.Timeout{
Duration: opts.Timeout,
Name: "ExecutionTimeout",
Text: telemetryReceiverErrStr,
}
if err := to.Validate(); err != nil {
return nil, err
}
if err := internal.ValidateShareName(opts.ShareName); err != nil {
return nil, err
}
tp, err := internal.NewTopicPattern(
"topicPattern",
topicPattern,
opts.TopicTokens,
opts.TopicNamespace,
)
if err != nil {
return nil, err
}
tf, err := tp.Filter()
if err != nil {
return nil, err
}
tr = &TelemetryReceiver[T]{
handler: handler,
manualAck: opts.ManualAck,
timeout: to,
log: logger,
}
tr.listener = &listener[T]{
app: app,
client: client,
encoding: encoding,
topic: tf,
shareName: opts.ShareName,
concurrency: opts.Concurrency,
supportedVersion: version.TelemetrySupported,
log: logger,
handler: tr,
}
tr.listener.register()
return tr, nil
}