go/protocol/telemetry_sender.go (158 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. package protocol import ( "context" "log/slog" "time" "github.com/Azure/iot-operations-sdks/go/internal/log" "github.com/Azure/iot-operations-sdks/go/internal/options" "github.com/Azure/iot-operations-sdks/go/protocol/internal" "github.com/Azure/iot-operations-sdks/go/protocol/internal/errutil" "github.com/Azure/iot-operations-sdks/go/protocol/internal/version" ) type ( // TelemetrySender provides the ability to send a single telemetry. TelemetrySender[T any] struct { publisher *publisher[T] log log.Logger } // TelemetrySenderOption represents a single telemetry sender option. TelemetrySenderOption interface { telemetrySender(*TelemetrySenderOptions) } // TelemetrySenderOptions are the resolved telemetry sender options. TelemetrySenderOptions struct { TopicNamespace string TopicTokens map[string]string Logger *slog.Logger } // SendOption represent a single per-send option. SendOption interface{ send(*SendOptions) } // SendOptions are the resolved per-send options. SendOptions struct { CloudEvent *CloudEvent Retain bool Timeout time.Duration TopicTokens map[string]string Metadata map[string]string } // WithRetain indicates that the telemetry event should be retained by the // broker. WithRetain bool // This option is not used directly; see WithCloudEvent below. withCloudEvent struct{ *CloudEvent } ) const telemetrySenderErrStr = "telemetry send" // NewTelemetrySender creates a new telemetry sender. func NewTelemetrySender[T any]( app *Application, client MqttClient, encoding Encoding[T], topicPattern string, opt ...TelemetrySenderOption, ) (ts *TelemetrySender[T], err error) { var opts TelemetrySenderOptions opts.Apply(opt) logger := log.Wrap(opts.Logger, app.log) defer func() { err = errutil.Return(err, logger, true) }() if err := errutil.ValidateNonNil(map[string]any{ "client": client, "encoding": encoding, }); err != nil { return nil, err } tp, err := internal.NewTopicPattern( "topicPattern", topicPattern, opts.TopicTokens, opts.TopicNamespace, ) if err != nil { return nil, err } ts = &TelemetrySender[T]{ log: logger, } ts.publisher = &publisher[T]{ app: app, client: client, encoding: encoding, topic: tp, version: version.Telemetry, } return ts, nil } // Send emits the telemetry. This will block until the message is ack'd. func (ts *TelemetrySender[T]) Send( ctx context.Context, val T, opt ...SendOption, ) (err error) { var opts SendOptions opts.Apply(opt) shallow := true defer func() { err = errutil.Return(err, ts.log, shallow) }() timeout := opts.Timeout if timeout == 0 { timeout = DefaultTimeout } expiry := &internal.Timeout{ Duration: timeout, Name: "MessageExpiry", Text: telemetrySenderErrStr, } if err := expiry.Validate(); err != nil { return err } msg := &Message[T]{ Payload: val, Metadata: opts.Metadata, } pub, err := ts.publisher.build(msg, opts.TopicTokens, expiry) if err != nil { return err } if err := opts.CloudEvent.toMessage(pub); err != nil { return err } pub.Retain = opts.Retain shallow = false if err := ts.publisher.publish(ctx, pub); err != nil { return err } ts.log.Debug(ctx, "telemetry sent", slog.String("topic", pub.Topic)) return nil } // Apply resolves the provided list of options. func (o *TelemetrySenderOptions) Apply( opts []TelemetrySenderOption, rest ...TelemetrySenderOption, ) { for opt := range options.Apply[TelemetrySenderOption](opts, rest...) { opt.telemetrySender(o) } } // ApplyOptions filters and resolves the provided list of options. func (o *TelemetrySenderOptions) ApplyOptions(opts []Option, rest ...Option) { for opt := range options.Apply[TelemetrySenderOption](opts, rest...) { opt.telemetrySender(o) } } func (o *TelemetrySenderOptions) telemetrySender(opt *TelemetrySenderOptions) { if o != nil { *opt = *o } } func (*TelemetrySenderOptions) option() {} // Apply resolves the provided list of options. func (o *SendOptions) Apply( opts []SendOption, rest ...SendOption, ) { for opt := range options.Apply[SendOption](opts, rest...) { opt.send(o) } } func (o *SendOptions) send(opt *SendOptions) { if o != nil { *opt = *o } } func (o WithRetain) send(opt *SendOptions) { opt.Retain = bool(o) } // WithCloudEvent adds a cloud event payload to the telemetry message. func WithCloudEvent(ce *CloudEvent) SendOption { return withCloudEvent{ce} } func (o withCloudEvent) send(opt *SendOptions) { opt.CloudEvent = o.CloudEvent } // Support CloudEvent used as an option directly for convenience. func (o *CloudEvent) send(opt *SendOptions) { opt.CloudEvent = o }