go/protocol/common_options.go (154 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package protocol
import (
"log/slog"
"maps"
"time"
)
type (
// WithConcurrency indicates how many handlers can execute in parallel.
WithConcurrency uint
// WithTimeout applies a context timeout to the message invocation or
// handler execution, as appropriate.
WithTimeout time.Duration
// WithShareName connects this listener to a shared MQTT subscription.
WithShareName string
// WithTopicTokens specifies topic token values.
WithTopicTokens map[string]string
// WithTopicTokenNamespace specifies a namespace that will be prepended to
// all previously-specified topic tokens. Topic tokens specified after this
// option will not be namespaced, allowing this to differentiate user tokens
// from system tokens.
WithTopicTokenNamespace string
// WithMetadata specifies user-provided metadata values.
WithMetadata map[string]string
// WithTopicNamespace specifies a namespace that will be prepended to the
// topic.
WithTopicNamespace string
// This option is not used directly; see WithLogger below.
withLogger struct{ *slog.Logger }
)
func (o WithConcurrency) commandExecutor(opt *CommandExecutorOptions) {
opt.Concurrency = uint(o)
}
func (WithConcurrency) option() {}
func (o WithConcurrency) telemetryReceiver(opt *TelemetryReceiverOptions) {
opt.Concurrency = uint(o)
}
func (o WithTimeout) commandExecutor(opt *CommandExecutorOptions) {
opt.Timeout = time.Duration(o)
}
func (o WithTimeout) invoke(opt *InvokeOptions) {
opt.Timeout = time.Duration(o)
}
func (WithTimeout) option() {}
func (o WithTimeout) send(opt *SendOptions) {
opt.Timeout = time.Duration(o)
}
func (o WithTimeout) telemetryReceiver(opt *TelemetryReceiverOptions) {
opt.Timeout = time.Duration(o)
}
func (o WithShareName) commandExecutor(opt *CommandExecutorOptions) {
opt.ShareName = string(o)
}
func (WithShareName) option() {}
func (o WithShareName) telemetryReceiver(opt *TelemetryReceiverOptions) {
opt.ShareName = string(o)
}
func (o WithTopicNamespace) commandExecutor(opt *CommandExecutorOptions) {
opt.TopicNamespace = string(o)
}
func (o WithTopicNamespace) commandInvoker(opt *CommandInvokerOptions) {
opt.TopicNamespace = string(o)
}
func (WithTopicNamespace) option() {}
func (o WithTopicNamespace) telemetryReceiver(opt *TelemetryReceiverOptions) {
opt.TopicNamespace = string(o)
}
func (o WithTopicNamespace) telemetrySender(opt *TelemetrySenderOptions) {
opt.TopicNamespace = string(o)
}
func (o WithTopicTokens) apply(tokens map[string]string) map[string]string {
if tokens == nil {
tokens = make(map[string]string, len(o))
}
maps.Copy(tokens, o)
return tokens
}
func (o WithTopicTokens) commandExecutor(opt *CommandExecutorOptions) {
opt.TopicTokens = o.apply(opt.TopicTokens)
}
func (o WithTopicTokens) commandInvoker(opt *CommandInvokerOptions) {
opt.TopicTokens = o.apply(opt.TopicTokens)
}
func (o WithTopicTokens) invoke(opt *InvokeOptions) {
opt.TopicTokens = o.apply(opt.TopicTokens)
}
func (WithTopicTokens) option() {}
func (o WithTopicTokens) send(opt *SendOptions) {
opt.TopicTokens = o.apply(opt.TopicTokens)
}
func (o WithTopicTokens) telemetryReceiver(opt *TelemetryReceiverOptions) {
opt.TopicTokens = o.apply(opt.TopicTokens)
}
func (o WithTopicTokens) telemetrySender(opt *TelemetrySenderOptions) {
opt.TopicTokens = o.apply(opt.TopicTokens)
}
func (o WithTopicTokenNamespace) apply(
tokens map[string]string,
) map[string]string {
result := make(map[string]string, len(tokens))
for token, value := range tokens {
result[string(o)+token] = value
}
return result
}
func (o WithTopicTokenNamespace) commandExecutor(opt *CommandExecutorOptions) {
opt.TopicTokens = o.apply(opt.TopicTokens)
}
func (o WithTopicTokenNamespace) commandInvoker(opt *CommandInvokerOptions) {
opt.TopicTokens = o.apply(opt.TopicTokens)
}
func (o WithTopicTokenNamespace) invoke(opt *InvokeOptions) {
opt.TopicTokens = o.apply(opt.TopicTokens)
}
func (WithTopicTokenNamespace) option() {}
func (o WithTopicTokenNamespace) send(opt *SendOptions) {
opt.TopicTokens = o.apply(opt.TopicTokens)
}
func (o WithTopicTokenNamespace) telemetryReceiver(
opt *TelemetryReceiverOptions,
) {
opt.TopicTokens = o.apply(opt.TopicTokens)
}
func (o WithTopicTokenNamespace) telemetrySender(opt *TelemetrySenderOptions) {
opt.TopicTokens = o.apply(opt.TopicTokens)
}
func (o WithMetadata) apply(values map[string]string) map[string]string {
if values == nil {
values = make(map[string]string, len(o))
}
maps.Copy(values, o)
return values
}
func (o WithMetadata) invoke(opt *InvokeOptions) {
opt.Metadata = o.apply(opt.Metadata)
}
func (o WithMetadata) send(opt *SendOptions) {
opt.Metadata = o.apply(opt.Metadata)
}
func (o WithMetadata) respond(opt *RespondOptions) {
opt.Metadata = o.apply(opt.Metadata)
}
// WithLogger enables logging with the provided slog logger.
func WithLogger(logger *slog.Logger) interface {
Option
ApplicationOption
CommandExecutorOption
CommandInvokerOption
TelemetryReceiverOption
TelemetrySenderOption
} {
return withLogger{logger}
}
func (o withLogger) application(opt *ApplicationOptions) {
opt.Logger = o.Logger
}
func (o withLogger) commandExecutor(opt *CommandExecutorOptions) {
opt.Logger = o.Logger
}
func (o withLogger) commandInvoker(opt *CommandInvokerOptions) {
opt.Logger = o.Logger
}
func (withLogger) option() {}
func (o withLogger) telemetryReceiver(opt *TelemetryReceiverOptions) {
opt.Logger = o.Logger
}
func (o withLogger) telemetrySender(opt *TelemetrySenderOptions) {
opt.Logger = o.Logger
}