func()

in plugin/connector/rocketmq/consumer.go [186:221]


func (h *BroadCastingMessageSubscribeHandler) handle(ctx context.Context, msg ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
	if len(msg) == 0 {
		return consumer.ConsumeSuccess, nil
	}

	messageExt := msg[0]
	messageExt.WithProperty(constants.PropertyMessageBornTimestamp,
		strconv.FormatInt(messageExt.BornTimestamp, 10))
	messageExt.WithProperty(constants.PropertyMessageStoreTimestamp,
		strconv.FormatInt(messageExt.StoreTimestamp, 10))

	message := &messageExt.Message
	convert.TransferMessageSystemProperties(message)

	event, err := convert.NewRocketMQMessageReader(message).ToCloudEvent(context.Background())
	if err != nil {
		return consumer.ConsumeSuccess, nil
	}

	consumeResult := consumer.ConsumeSuccess
	commitFunction := func(action connector.EventMeshAction) error {
		switch action {
		case connector.CommitMessage:
			consumeResult = consumer.ConsumeSuccess
		case connector.ReconsumeLater:
			consumeResult = consumer.ConsumeRetryLater
		case connector.ManualAck:
			// currently, RocketMQ go client doesn't support manual offset updating, so just commit message here
			// TODO support manual offset updating
			consumeResult = consumer.ConsumeSuccess
		}
		return nil
	}
	h.consumer.listener.Consume(event, commitFunction)
	return consumeResult, nil
}