in runtime/core/protocol/grpc/consumer/message_handler.go [70:99]
func (m *messageHandler) Handler(mctx *MessageContext) error {
if m.Size() > ConsumerGroupWaitingRequestThreshold {
log.Warnf("too many request, reject and send back to MQ, group:%v, threshold:%v",
mctx.ConsumerGroup, ConsumerGroupWaitingRequestThreshold)
return ErrRequestReachMaxThreshold
}
var (
try func() error
)
if mctx.GrpcType == consts.WEBHOOK {
req, err := NewWebhookRequest(mctx)
if err != nil {
return err
}
try = req.Try
} else {
req, err := NewStreamRequest(mctx)
if err != nil {
return err
}
try = req.Try
}
go func() {
if err := try(); err != nil {
log.Warnf("failed to handle msg, group:%v, topic:%v, err:%v", mctx.ConsumerGroup, mctx.TopicConfig, err)
}
}()
return nil
}