func()

in go/protocol/publisher.go [29:91]


func (p *publisher[T]) build(
	msg *Message[T],
	topicTokens map[string]string,
	timeout *internal.Timeout,
) (*mqtt.Message, error) {
	pub := &mqtt.Message{}
	var err error

	if p.topic != nil {
		pub.Topic, err = p.topic.Topic(topicTokens)
		if err != nil {
			return nil, err
		}
	}

	pub.PublishOptions = mqtt.PublishOptions{
		QoS:           1,
		MessageExpiry: timeout.MessageExpiry(),
	}

	if msg != nil {
		data, err := serialize(p.encoding, msg.Payload)
		if err != nil {
			return nil, err
		}

		pub.Payload = data.Payload
		pub.ContentType = data.ContentType
		pub.PayloadFormat = data.PayloadFormat

		if msg.CorrelationData != "" {
			correlationData, err := uuid.Parse(msg.CorrelationData)
			if err != nil {
				return nil, &errors.Client{
					Message: "correlation data is not a valid UUID",
					Kind: errors.InternalLogicError{
						PropertyName: "CorrelationData",
					},
					Nested: err,
				}
			}
			pub.CorrelationData = correlationData[:]
		}

		if msg.Metadata != nil {
			pub.UserProperties = msg.Metadata
		} else {
			pub.UserProperties = map[string]string{}
		}
	} else {
		pub.UserProperties = map[string]string{}
	}

	ts, err := p.app.hlc.Get()
	if err != nil {
		return nil, err
	}
	pub.UserProperties[constants.SourceID] = p.client.ID()
	pub.UserProperties[constants.Timestamp] = ts.String()
	pub.UserProperties[constants.ProtocolVersion] = p.version

	return pub, nil
}