in runtime/core/protocol/grpc/consumer/consumer_mesh.go [208:251]
func (e *eventMeshConsumer) createEventListener(mode pb.Subscription_SubscriptionItem_SubscriptionMode) *connector.EventListener {
return &connector.EventListener{
Consume: func(event *cloudv2.Event, commitFunc connector.CommitFunc) error {
var commitAction connector.EventMeshAction
defer commitFunc(commitAction)
eventclone := event.Clone()
eventclone.SetExtension(consts.REQ_MQ2EVENTMESH_TIMESTAMP, time.Now().UnixMilli())
topic := event.Subject()
bizSeqNo := eventclone.Extensions()[consts.PROPERTY_MESSAGE_SEARCH_KEYS]
uniqueID := eventclone.Extensions()[consts.RMB_UNIQ_ID]
log.Infof("mq to eventmesh, topic:%v, bizSeqNo:%v, uniqueID:%v", topic, bizSeqNo, uniqueID)
val, ok := e.consumerGroupTopicConfig.Load(topic)
if !ok {
log.Debugf("no active consumer for topic:%v", topic)
commitAction = connector.CommitMessage
return nil
}
topicConfig := val.(ConsumerGroupTopicOption)
tpy := topicConfig.GRPCType()
mctx := &MessageContext{
GrpcType: tpy,
ConsumerGroup: e.ConsumerGroup,
SubscriptionMode: mode,
Event: &eventclone,
TopicConfig: topicConfig,
}
if err := e.messageHandler.Handler(mctx); err != nil {
log.Warnf("handle msg err:%v, topic:%v, group:%v", err, topic, topicConfig.ConsumerGroup)
// can not handle the message due to the capacity limit is reached
// wait for 5 seconds and send this message back to mq and consume again
time.Sleep(time.Second * 5)
//e.sendMessageBack()
commitAction = connector.CommitMessage
return nil
}
commitAction = connector.ManualAck
return nil
},
}
}