in plugin/connector/rocketmq/consumer.go [140:175]
func (h *ClusteringMessageSubscribeHandler) handle(ctx context.Context, msg ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
if len(msg) == 0 {
return consumer.ConsumeSuccess, nil
}
messageExt := msg[0]
messageExt.WithProperty(constants.PropertyMessageBornTimestamp,
strconv.FormatInt(messageExt.BornTimestamp, 10))
messageExt.WithProperty(constants.PropertyMessageStoreTimestamp,
strconv.FormatInt(messageExt.StoreTimestamp, 10))
message := &messageExt.Message
convert.TransferMessageSystemProperties(message)
event, err := convert.NewRocketMQMessageReader(message).ToCloudEvent(context.Background())
if err != nil {
return consumer.ConsumeSuccess, nil
}
consumeResult := consumer.ConsumeSuccess
commitFunction := func(action connector.EventMeshAction) error {
switch action {
case connector.CommitMessage:
consumeResult = consumer.ConsumeSuccess
case connector.ReconsumeLater:
consumeResult = consumer.ConsumeRetryLater
case connector.ManualAck:
// currently, RocketMQ go client doesn't support manual offset updating, so just commit message here
// TODO support manual offset updating
consumeResult = consumer.ConsumeSuccess
}
return nil
}
h.consumer.listener.Consume(event, commitFunction)
return consumeResult, nil
}