go/protocol/publisher.go (83 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package protocol
import (
"context"
"time"
"github.com/Azure/iot-operations-sdks/go/internal/mqtt"
"github.com/Azure/iot-operations-sdks/go/protocol/errors"
"github.com/Azure/iot-operations-sdks/go/protocol/internal"
"github.com/Azure/iot-operations-sdks/go/protocol/internal/constants"
"github.com/Azure/iot-operations-sdks/go/protocol/internal/errutil"
"github.com/google/uuid"
)
// Provide the shared implementation details for the MQTT publishers.
type publisher[T any] struct {
app *Application
client MqttClient
encoding Encoding[T]
topic *internal.TopicPattern
version string
}
// DefaultTimeout is the timeout applied to Invoke or Send if none is specified.
const DefaultTimeout = 10 * time.Second
func (p *publisher[T]) build(
msg *Message[T],
topicTokens map[string]string,
timeout *internal.Timeout,
) (*mqtt.Message, error) {
pub := &mqtt.Message{}
var err error
if p.topic != nil {
pub.Topic, err = p.topic.Topic(topicTokens)
if err != nil {
return nil, err
}
}
pub.PublishOptions = mqtt.PublishOptions{
QoS: 1,
MessageExpiry: timeout.MessageExpiry(),
}
if msg != nil {
data, err := serialize(p.encoding, msg.Payload)
if err != nil {
return nil, err
}
pub.Payload = data.Payload
pub.ContentType = data.ContentType
pub.PayloadFormat = data.PayloadFormat
if msg.CorrelationData != "" {
correlationData, err := uuid.Parse(msg.CorrelationData)
if err != nil {
return nil, &errors.Client{
Message: "correlation data is not a valid UUID",
Kind: errors.InternalLogicError{
PropertyName: "CorrelationData",
},
Nested: err,
}
}
pub.CorrelationData = correlationData[:]
}
if msg.Metadata != nil {
pub.UserProperties = msg.Metadata
} else {
pub.UserProperties = map[string]string{}
}
} else {
pub.UserProperties = map[string]string{}
}
ts, err := p.app.hlc.Get()
if err != nil {
return nil, err
}
pub.UserProperties[constants.SourceID] = p.client.ID()
pub.UserProperties[constants.Timestamp] = ts.String()
pub.UserProperties[constants.ProtocolVersion] = p.version
return pub, nil
}
func (p *publisher[T]) publish(ctx context.Context, msg *mqtt.Message) error {
ack, err := p.client.Publish(
ctx,
msg.Topic,
msg.Payload,
&msg.PublishOptions,
)
return errutil.Mqtt(ctx, "publish", ack, err)
}