func()

in runtime/core/protocol/grpc/consumer/consumer_mesh.go [208:251]


func (e *eventMeshConsumer) createEventListener(mode pb.Subscription_SubscriptionItem_SubscriptionMode) *connector.EventListener {
	return &connector.EventListener{
		Consume: func(event *cloudv2.Event, commitFunc connector.CommitFunc) error {
			var commitAction connector.EventMeshAction
			defer commitFunc(commitAction)

			eventclone := event.Clone()
			eventclone.SetExtension(consts.REQ_MQ2EVENTMESH_TIMESTAMP, time.Now().UnixMilli())
			topic := event.Subject()
			bizSeqNo := eventclone.Extensions()[consts.PROPERTY_MESSAGE_SEARCH_KEYS]
			uniqueID := eventclone.Extensions()[consts.RMB_UNIQ_ID]
			log.Infof("mq to eventmesh, topic:%v, bizSeqNo:%v, uniqueID:%v", topic, bizSeqNo, uniqueID)

			val, ok := e.consumerGroupTopicConfig.Load(topic)
			if !ok {
				log.Debugf("no active consumer for topic:%v", topic)
				commitAction = connector.CommitMessage
				return nil
			}

			topicConfig := val.(ConsumerGroupTopicOption)
			tpy := topicConfig.GRPCType()
			mctx := &MessageContext{
				GrpcType:         tpy,
				ConsumerGroup:    e.ConsumerGroup,
				SubscriptionMode: mode,
				Event:            &eventclone,
				TopicConfig:      topicConfig,
			}
			if err := e.messageHandler.Handler(mctx); err != nil {
				log.Warnf("handle msg err:%v, topic:%v, group:%v", err, topic, topicConfig.ConsumerGroup)
				// can not handle the message due to the capacity limit is reached
				// wait for 5 seconds and send this message back to mq and consume again
				time.Sleep(time.Second * 5)
				//e.sendMessageBack()
				commitAction = connector.CommitMessage
				return nil
			}

			commitAction = connector.ManualAck
			return nil
		},
	}
}