func()

in consumer/push_consumer.go [1083:1223]


func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primitive.MessageQueue) {
	msgs := pq.getMessages()
	if msgs == nil {
		return
	}

	limiter := pc.option.Limiter
	limiterOn := limiter != nil
	if _, ok := pc.crCh[mq.Topic]; !ok {
		pc.crCh[mq.Topic] = make(chan struct{}, pc.defaultConsumer.option.ConsumeGoroutineNums)
	}

	for count := 0; count < len(msgs); count++ {
		var subMsgs []*primitive.MessageExt
		if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
			subMsgs = msgs[count:]
			count = len(msgs)
		} else {
			next := count + pc.option.ConsumeMessageBatchMaxSize
			subMsgs = msgs[count:next]
			count = next - 1
		}

		if limiterOn {
			limiter(utils.WithoutNamespace(mq.Topic))
		}
		pc.crCh[mq.Topic] <- struct{}{}

		go primitive.WithRecover(func() {
			defer func() {
				if err := recover(); err != nil {
					if primitive.DefaultPanicHandler != nil {
						primitive.DefaultPanicHandler(err)
					}
					rlog.Error("consumeMessageConcurrently panic", map[string]interface{}{
						rlog.LogKeyUnderlayError: err,
						rlog.LogKeyStack:         utils.GetStackAsString(false),
						rlog.LogKeyConsumerGroup: pc.consumerGroup,
					})
				}
				<-pc.crCh[mq.Topic]
			}()
		RETRY:
			if pq.IsDroppd() {
				rlog.Info("the message queue not be able to consume, because it was dropped", map[string]interface{}{
					rlog.LogKeyMessageQueue:  mq.String(),
					rlog.LogKeyConsumerGroup: pc.consumerGroup,
				})
				return
			}

			beginTime := time.Now()
			pc.resetRetryAndNamespace(subMsgs)
			var result ConsumeResult

			var err error
			msgCtx := &primitive.ConsumeMessageContext{
				Properties:    make(map[string]string),
				ConsumerGroup: pc.consumerGroup,
				MQ:            mq,
				Msgs:          subMsgs,
			}
			ctx := context.Background()
			ctx = primitive.WithConsumerCtx(ctx, msgCtx)
			ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
			concurrentCtx := primitive.NewConsumeConcurrentlyContext()
			concurrentCtx.MQ = *mq
			ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)

			result, err = pc.consumeInner(ctx, subMsgs)

			consumeRT := time.Now().Sub(beginTime)
			if err != nil {
				rlog.Warning("consumeMessageCurrently error", map[string]interface{}{
					rlog.LogKeyUnderlayError: err,
					rlog.LogKeyMessages:      msgs,
					rlog.LogKeyMessageQueue:  mq,
					rlog.LogKeyConsumerGroup: pc.consumerGroup,
				})
				msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionReturn)
			} else if consumeRT >= pc.option.ConsumeTimeout {
				rlog.Warning("consumeMessageCurrently time out", map[string]interface{}{
					rlog.LogKeyMessages:      msgs,
					rlog.LogKeyMessageQueue:  mq,
					rlog.LogKeyConsumerGroup: pc.consumerGroup,
				})
				msgCtx.Properties[primitive.PropCtxType] = string(primitive.TimeoutReturn)
			} else if result == ConsumeSuccess {
				msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
			} else if result == ConsumeRetryLater {
				msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
			}

			pc.stat.increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))

			if !pq.IsDroppd() {
				msgBackFailed := make([]*primitive.MessageExt, 0)
				msgBackSucceed := make([]*primitive.MessageExt, 0)
				if result == ConsumeSuccess {
					pc.stat.increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
					msgBackSucceed = subMsgs
				} else {
					pc.stat.increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
					if pc.model == BroadCasting {
						for i := 0; i < len(subMsgs); i++ {
							rlog.Warning("BROADCASTING, the message consume failed, drop it", map[string]interface{}{
								"message": subMsgs[i],
							})
						}
					} else {
						for i := 0; i < len(subMsgs); i++ {
							msg := subMsgs[i]
							if pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
								msgBackSucceed = append(msgBackSucceed, msg)
							} else {
								msg.ReconsumeTimes += 1
								msgBackFailed = append(msgBackFailed, msg)
							}
						}
					}
				}

				offset := pq.removeMessage(msgBackSucceed...)

				if offset >= 0 && !pq.IsDroppd() {
					pc.storage.update(mq, int64(offset), true)
				}
				if len(msgBackFailed) > 0 {
					subMsgs = msgBackFailed
					time.Sleep(5 * time.Second)
					goto RETRY
				}
			} else {
				rlog.Warning("processQueue is dropped without process consume result.", map[string]interface{}{
					rlog.LogKeyMessageQueue: mq,
					"message":               subMsgs,
				})
			}
		})
	}
}