go/protocol/telemetry_receiver.go (218 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. package protocol import ( "context" "fmt" "log/slog" "time" "github.com/Azure/iot-operations-sdks/go/internal/log" "github.com/Azure/iot-operations-sdks/go/internal/mqtt" "github.com/Azure/iot-operations-sdks/go/internal/options" "github.com/Azure/iot-operations-sdks/go/protocol/errors" "github.com/Azure/iot-operations-sdks/go/protocol/internal" "github.com/Azure/iot-operations-sdks/go/protocol/internal/errutil" "github.com/Azure/iot-operations-sdks/go/protocol/internal/version" ) type ( // TelemetryReceiver provides the ability to handle the receipt of a single // telemetry. TelemetryReceiver[T any] struct { listener *listener[T] handler TelemetryHandler[T] manualAck bool timeout *internal.Timeout log log.Logger } // TelemetryReceiverOption represents a single telemetry receiver option. TelemetryReceiverOption interface { telemetryReceiver(*TelemetryReceiverOptions) } // TelemetryReceiverOptions are the resolved telemetry receiver options. TelemetryReceiverOptions struct { ManualAck bool Concurrency uint Timeout time.Duration ShareName string TopicNamespace string TopicTokens map[string]string Logger *slog.Logger } // TelemetryHandler is the user-provided implementation of a single // telemetry event handler. It is treated as blocking; all parallelism is // handled by the library. This *must* be thread-safe. TelemetryHandler[T any] = func(context.Context, *TelemetryMessage[T]) error // TelemetryMessage contains per-message data and methods that are exposed // to the telemetry handlers. TelemetryMessage[T any] struct { Message[T] // Ack provides a function to manually ack if enabled and if possible; // it will be nil otherwise. Note that, since QoS 0 messages cannot be // acked, this will be nil in this case even if manual ack is enabled. Ack func() } // WithManualAck indicates that the handler is responsible for manually // acking the telemetry message. WithManualAck bool ) const ( telemetryReceiverComponentName = "telemetry receiver" telemetryReceiverErrStr = "telemetry receipt" ) // NewTelemetryReceiver creates a new telemetry receiver. func NewTelemetryReceiver[T any]( app *Application, client MqttClient, encoding Encoding[T], topicPattern string, handler TelemetryHandler[T], opt ...TelemetryReceiverOption, ) (tr *TelemetryReceiver[T], err error) { var opts TelemetryReceiverOptions opts.Apply(opt) logger := log.Wrap(opts.Logger, app.log) defer func() { err = errutil.Return(err, logger, true) }() if err := errutil.ValidateNonNil(map[string]any{ "client": client, "encoding": encoding, "handler": handler, }); err != nil { return nil, err } to := &internal.Timeout{ Duration: opts.Timeout, Name: "ExecutionTimeout", Text: telemetryReceiverErrStr, } if err := to.Validate(); err != nil { return nil, err } if err := internal.ValidateShareName(opts.ShareName); err != nil { return nil, err } tp, err := internal.NewTopicPattern( "topicPattern", topicPattern, opts.TopicTokens, opts.TopicNamespace, ) if err != nil { return nil, err } tf, err := tp.Filter() if err != nil { return nil, err } tr = &TelemetryReceiver[T]{ handler: handler, manualAck: opts.ManualAck, timeout: to, log: logger, } tr.listener = &listener[T]{ app: app, client: client, encoding: encoding, topic: tf, shareName: opts.ShareName, concurrency: opts.Concurrency, supportedVersion: version.TelemetrySupported, log: logger, handler: tr, } tr.listener.register() return tr, nil } // Start listening to the MQTT telemetry topic. func (tr *TelemetryReceiver[T]) Start(ctx context.Context) error { return tr.listener.start(ctx, telemetryReceiverComponentName) } // Close the telemetry receiver to free its resources. func (tr *TelemetryReceiver[T]) Close() { tr.listener.close(telemetryReceiverComponentName) } func (tr *TelemetryReceiver[T]) onMsg( ctx context.Context, pub *mqtt.Message, msg *Message[T], ) error { tr.log.Debug(ctx, "telemetry received", slog.String("topic", pub.Topic)) message := &TelemetryMessage[T]{Message: *msg} var err error message.Payload, err = tr.listener.payload(msg) if err != nil { return err } if tr.manualAck && pub.QoS > 0 { message.Ack = pub.Ack } handlerCtx, cancel := tr.timeout.Context(ctx) defer cancel() if err := tr.handle(handlerCtx, message); err != nil { return err } tr.ack(ctx, pub) return nil } func (tr *TelemetryReceiver[T]) onErr( ctx context.Context, pub *mqtt.Message, err error, ) error { defer tr.ack(ctx, pub) // Strip off any no-return flags (without extra logging, since we're about // to drop the message anyways). if no, e := errutil.IsNoReturn(err); no { return e } return err } // Call handler with panic catch. func (tr *TelemetryReceiver[T]) handle( ctx context.Context, msg *TelemetryMessage[T], ) error { rchan := make(chan error) // TODO: This goroutine will leak if the handler blocks without respecting // the context. This is a known limitation to align to the C# behavior, and // should be changed if that behavior is revisited. go func() { var err error defer func() { if ePanic := recover(); ePanic != nil { err = &errors.Remote{ Message: fmt.Sprint(ePanic), Kind: errors.ExecutionError{}, } } select { case rchan <- err: case <-ctx.Done(): } }() err = tr.handler(ctx, msg) if e := errutil.Context(ctx, telemetryReceiverErrStr); e != nil { // An error from the context overrides any return value. err = e } else if err != nil { err = &errors.Remote{ Message: err.Error(), Kind: errors.ExecutionError{}, } } }() select { case err := <-rchan: return err case <-ctx.Done(): return errutil.Context(ctx, telemetryReceiverErrStr) } } // Ack the telemetry if automatic and log it. func (tr *TelemetryReceiver[T]) ack( ctx context.Context, pub *mqtt.Message, ) { if !tr.manualAck && pub.QoS > 0 { pub.Ack() tr.log.Debug(ctx, "telemetry acked", slog.String("topic", pub.Topic)) } } // Apply resolves the provided list of options. func (o *TelemetryReceiverOptions) Apply( opts []TelemetryReceiverOption, rest ...TelemetryReceiverOption, ) { for opt := range options.Apply[TelemetryReceiverOption](opts, rest...) { opt.telemetryReceiver(o) } } // ApplyOptions filters and resolves the provided list of options. func (o *TelemetryReceiverOptions) ApplyOptions(opts []Option, rest ...Option) { for opt := range options.Apply[TelemetryReceiverOption](opts, rest...) { opt.telemetryReceiver(o) } } func (o *TelemetryReceiverOptions) telemetryReceiver( opt *TelemetryReceiverOptions, ) { if o != nil { *opt = *o } } func (*TelemetryReceiverOptions) option() {} func (o WithManualAck) telemetryReceiver(opt *TelemetryReceiverOptions) { opt.ManualAck = bool(o) } func (WithManualAck) option() {}