in consumer/push_consumer.go [1083:1223]
func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primitive.MessageQueue) {
msgs := pq.getMessages()
if msgs == nil {
return
}
limiter := pc.option.Limiter
limiterOn := limiter != nil
if _, ok := pc.crCh[mq.Topic]; !ok {
pc.crCh[mq.Topic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums)
}
for count := 0; count < len(msgs); count++ {
var subMsgs []*primitive.MessageExt
if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
subMsgs = msgs[count:]
count = len(msgs)
} else {
next := count + pc.option.ConsumeMessageBatchMaxSize
subMsgs = msgs[count:next]
count = next - 1
}
if limiterOn {
limiter(utils.WithoutNamespace(mq.Topic))
}
pc.crCh[mq.Topic] <- struct{}{}
go primitive.WithRecover(func() {
defer func() {
if err := recover(); err != nil {
if primitive.DefaultPanicHandler != nil {
primitive.DefaultPanicHandler(err)
}
rlog.Error("consumeMessageConcurrently panic", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
rlog.LogKeyStack: utils.GetStackAsString(false),
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
}
<-pc.crCh[mq.Topic]
}()
RETRY:
if pq.IsDroppd() {
rlog.Info("the message queue not be able to consume, because it was dropped", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
beginTime := time.Now()
pc.resetRetryAndNamespace(subMsgs)
var result ConsumeResult
var err error
msgCtx := &primitive.ConsumeMessageContext{
Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
MQ: mq,
Msgs: subMsgs,
}
ctx := context.Background()
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
concurrentCtx := primitive.NewConsumeConcurrentlyContext()
concurrentCtx.MQ = *mq
ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)
result, err = pc.consumeInner(ctx, subMsgs)
consumeRT := time.Now().Sub(beginTime)
if err != nil {
rlog.Warning("consumeMessageCurrently error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
rlog.LogKeyMessages: msgs,
rlog.LogKeyMessageQueue: mq,
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionReturn)
} else if consumeRT >= pc.option.ConsumeTimeout {
rlog.Warning("consumeMessageCurrently time out", map[string]interface{}{
rlog.LogKeyMessages: msgs,
rlog.LogKeyMessageQueue: mq,
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
msgCtx.Properties[primitive.PropCtxType] = string(primitive.TimeoutReturn)
} else if result == ConsumeSuccess {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
} else if result == ConsumeRetryLater {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
}
pc.stat.increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
if !pq.IsDroppd() {
msgBackFailed := make([]*primitive.MessageExt, 0)
msgBackSucceed := make([]*primitive.MessageExt, 0)
if result == ConsumeSuccess {
pc.stat.increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
msgBackSucceed = subMsgs
} else {
pc.stat.increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
if pc.model == BroadCasting {
for i := 0; i < len(subMsgs); i++ {
rlog.Warning("BROADCASTING, the message consume failed, drop it", map[string]interface{}{
"message": subMsgs[i],
})
}
} else {
for i := 0; i < len(subMsgs); i++ {
msg := subMsgs[i]
if pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
msgBackSucceed = append(msgBackSucceed, msg)
} else {
msg.ReconsumeTimes += 1
msgBackFailed = append(msgBackFailed, msg)
}
}
}
}
offset := pq.removeMessage(msgBackSucceed...)
if offset >= 0 && !pq.IsDroppd() {
pc.storage.update(mq, int64(offset), true)
}
if len(msgBackFailed) > 0 {
subMsgs = msgBackFailed
time.Sleep(5 * time.Second)
goto RETRY
}
} else {
rlog.Warning("processQueue is dropped without process consume result.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq,
"message": subMsgs,
})
}
})
}
}