func()

in consumer/push_consumer.go [1225:1375]


func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.MessageQueue) {
	if pq.IsDroppd() {
		rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
			rlog.LogKeyMessageQueue: mq.String(),
		})
		return
	}

	lock := pc.queueLock.fetchLock(*mq)
	lock.Lock()
	defer lock.Unlock()
	if pc.model == BroadCasting || (pq.IsLock() && !pq.isLockExpired()) {
		beginTime := time.Now()

		continueConsume := true
		for continueConsume {
			if pq.IsDroppd() {
				rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
					rlog.LogKeyMessageQueue: mq.String(),
				})
				break
			}
			if pc.model == Clustering {
				if !pq.IsLock() {
					rlog.Warning("the message queue not locked, so consume later", map[string]interface{}{
						rlog.LogKeyMessageQueue: mq.String(),
					})
					pc.tryLockLaterAndReconsume(mq, 10)
					return
				}
				if pq.isLockExpired() {
					rlog.Warning("the message queue lock expired, so consume later", map[string]interface{}{
						rlog.LogKeyMessageQueue: mq.String(),
					})
					pc.tryLockLaterAndReconsume(mq, 10)
					return
				}
			}
			interval := time.Now().Sub(beginTime)
			if interval > pc.option.MaxTimeConsumeContinuously {
				time.Sleep(10 * time.Millisecond)
				return
			}
			batchSize := pc.option.ConsumeMessageBatchMaxSize
			msgs := pq.takeMessages(batchSize)

			pc.resetRetryAndNamespace(msgs)

			if len(msgs) == 0 {
				continueConsume = false
				break
			}

			// TODO: add message consumer hook
			beginTime = time.Now()

			ctx := context.Background()
			msgCtx := &primitive.ConsumeMessageContext{
				Properties:    make(map[string]string),
				ConsumerGroup: pc.consumerGroup,
				MQ:            mq,
				Msgs:          msgs,
			}
			ctx = primitive.WithConsumerCtx(ctx, msgCtx)
			ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)

			orderlyCtx := primitive.NewConsumeOrderlyContext()
			orderlyCtx.MQ = *mq
			ctx = primitive.WithOrderlyCtx(ctx, orderlyCtx)

			pq.lockConsume.Lock()
			result, err := pc.consumeInner(ctx, msgs)
			if err != nil {
				rlog.Warning("consumeMessage orderly error", map[string]interface{}{
					rlog.LogKeyUnderlayError: err,
					rlog.LogKeyMessages:      msgs,
					rlog.LogKeyMessageQueue:  mq.String(),
					rlog.LogKeyConsumerGroup: pc.consumerGroup,
				})
			}
			pq.lockConsume.Unlock()

			if result == Rollback || result == SuspendCurrentQueueAMoment {
				rlog.Warning("consumeMessage Orderly return not OK", map[string]interface{}{
					rlog.LogKeyConsumerGroup: pc.consumerGroup,
					"messages":               msgs,
					rlog.LogKeyMessageQueue:  mq,
				})
			}

			// just put consumeResult in consumerMessageCtx
			//interval = time.Now().Sub(beginTime)
			//consumeReult := SuccessReturn
			//if interval > pc.option.ConsumeTimeout {
			//	consumeReult = TimeoutReturn
			//} else if SuspendCurrentQueueAMoment == result {
			//	consumeReult = FailedReturn
			//} else if ConsumeSuccess == result {
			//	consumeReult = SuccessReturn
			//}

			// process result
			commitOffset := int64(-1)
			if pc.option.AutoCommit {
				switch result {
				case Commit, Rollback:
					rlog.Warning("the message queue consume result is illegal, we think you want to ack these message: %v", map[string]interface{}{
						rlog.LogKeyMessageQueue: mq,
					})
				case ConsumeSuccess:
					commitOffset = pq.commit()
				case SuspendCurrentQueueAMoment:
					if pc.checkReconsumeTimes(msgs) {
						pq.makeMessageToCosumeAgain(msgs...)
						time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
						continueConsume = false
					} else {
						commitOffset = pq.commit()
					}
				default:
				}
			} else {
				switch result {
				case ConsumeSuccess:
				case Commit:
					commitOffset = pq.commit()
				case Rollback:
					// pq.rollback
					time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
					continueConsume = false
				case SuspendCurrentQueueAMoment:
					if pc.checkReconsumeTimes(msgs) {
						time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
						continueConsume = false
					}
				default:
				}
			}
			if commitOffset > 0 && !pq.IsDroppd() {
				_ = pc.updateOffset(mq, commitOffset)
			}
		}
	} else {
		if pq.IsDroppd() {
			rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
				rlog.LogKeyMessageQueue: mq.String(),
			})
		}
		pc.tryLockLaterAndReconsume(mq, 100)
	}
}