func()

in pulsar/consumer_impl.go [598:656]


func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string,
	delay time.Duration) {
	if delay < 0 {
		delay = 0
	}

	if !checkMessageIDType(msg.ID()) {
		c.log.Warnf("invalid message id type %T", msg.ID())
		return
	}

	msgID := c.messageID(msg.ID())
	if msgID == nil {
		return
	}

	props := make(map[string]string)
	for k, v := range msg.Properties() {
		props[k] = v
	}

	for k, v := range customProperties {
		props[k] = v
	}

	reconsumeTimes := 1
	if s, ok := props[SysPropertyReconsumeTimes]; ok {
		reconsumeTimes, _ = strconv.Atoi(s)
		reconsumeTimes++
	} else {
		props[SysPropertyRealTopic] = msg.Topic()
		props[SysPropertyOriginMessageID] = msgID.messageID.String()
	}
	props[SysPropertyReconsumeTimes] = strconv.Itoa(reconsumeTimes)
	props[SysPropertyDelayTime] = fmt.Sprintf("%d", int64(delay)/1e6)

	consumerMsg := ConsumerMessage{
		Consumer: c,
		Message: &message{
			payLoad:    msg.Payload(),
			properties: props,
			msgID:      msgID,
		},
	}
	if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries {
		c.dlq.Chan() <- consumerMsg
	} else {
		c.rlq.Chan() <- RetryMessage{
			consumerMsg: consumerMsg,
			producerMsg: ProducerMessage{
				Payload:      msg.Payload(),
				Key:          msg.Key(),
				OrderingKey:  msg.OrderingKey(),
				Properties:   props,
				DeliverAfter: delay,
			},
		}
	}
}