in go/protocol/publisher.go [29:91]
func (p *publisher[T]) build(
msg *Message[T],
topicTokens map[string]string,
timeout *internal.Timeout,
) (*mqtt.Message, error) {
pub := &mqtt.Message{}
var err error
if p.topic != nil {
pub.Topic, err = p.topic.Topic(topicTokens)
if err != nil {
return nil, err
}
}
pub.PublishOptions = mqtt.PublishOptions{
QoS: 1,
MessageExpiry: timeout.MessageExpiry(),
}
if msg != nil {
data, err := serialize(p.encoding, msg.Payload)
if err != nil {
return nil, err
}
pub.Payload = data.Payload
pub.ContentType = data.ContentType
pub.PayloadFormat = data.PayloadFormat
if msg.CorrelationData != "" {
correlationData, err := uuid.Parse(msg.CorrelationData)
if err != nil {
return nil, &errors.Client{
Message: "correlation data is not a valid UUID",
Kind: errors.InternalLogicError{
PropertyName: "CorrelationData",
},
Nested: err,
}
}
pub.CorrelationData = correlationData[:]
}
if msg.Metadata != nil {
pub.UserProperties = msg.Metadata
} else {
pub.UserProperties = map[string]string{}
}
} else {
pub.UserProperties = map[string]string{}
}
ts, err := p.app.hlc.Get()
if err != nil {
return nil, err
}
pub.UserProperties[constants.SourceID] = p.client.ID()
pub.UserProperties[constants.Timestamp] = ts.String()
pub.UserProperties[constants.ProtocolVersion] = p.version
return pub, nil
}