in pulsar/consumer_impl.go [540:598]
func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string,
delay time.Duration) {
if delay < 0 {
delay = 0
}
if !checkMessageIDType(msg.ID()) {
c.log.Warnf("invalid message id type %T", msg.ID())
return
}
msgID := c.messageID(msg.ID())
if msgID == nil {
return
}
props := make(map[string]string)
for k, v := range msg.Properties() {
props[k] = v
}
for k, v := range customProperties {
props[k] = v
}
reconsumeTimes := 1
if s, ok := props[SysPropertyReconsumeTimes]; ok {
reconsumeTimes, _ = strconv.Atoi(s)
reconsumeTimes++
} else {
props[SysPropertyRealTopic] = msg.Topic()
props[SysPropertyOriginMessageID] = msgID.messageID.String()
}
props[SysPropertyReconsumeTimes] = strconv.Itoa(reconsumeTimes)
props[SysPropertyDelayTime] = fmt.Sprintf("%d", int64(delay)/1e6)
consumerMsg := ConsumerMessage{
Consumer: c,
Message: &message{
payLoad: msg.Payload(),
properties: props,
msgID: msgID,
},
}
if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries {
c.dlq.Chan() <- consumerMsg
} else {
c.rlq.Chan() <- RetryMessage{
consumerMsg: consumerMsg,
producerMsg: ProducerMessage{
Payload: msg.Payload(),
Key: msg.Key(),
OrderingKey: msg.OrderingKey(),
Properties: props,
DeliverAfter: delay,
},
}
}
}