in consumer/push_consumer.go [1225:1375]
func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.MessageQueue) {
if pq.IsDroppd() {
rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
return
}
lock := pc.queueLock.fetchLock(*mq)
lock.Lock()
defer lock.Unlock()
if pc.model == BroadCasting || (pq.IsLock() && !pq.isLockExpired()) {
beginTime := time.Now()
continueConsume := true
for continueConsume {
if pq.IsDroppd() {
rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
break
}
if pc.model == Clustering {
if !pq.IsLock() {
rlog.Warning("the message queue not locked, so consume later", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
pc.tryLockLaterAndReconsume(mq, 10)
return
}
if pq.isLockExpired() {
rlog.Warning("the message queue lock expired, so consume later", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
pc.tryLockLaterAndReconsume(mq, 10)
return
}
}
interval := time.Now().Sub(beginTime)
if interval > pc.option.MaxTimeConsumeContinuously {
time.Sleep(10 * time.Millisecond)
return
}
batchSize := pc.option.ConsumeMessageBatchMaxSize
msgs := pq.takeMessages(batchSize)
pc.resetRetryAndNamespace(msgs)
if len(msgs) == 0 {
continueConsume = false
break
}
// TODO: add message consumer hook
beginTime = time.Now()
ctx := context.Background()
msgCtx := &primitive.ConsumeMessageContext{
Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
MQ: mq,
Msgs: msgs,
}
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
orderlyCtx := primitive.NewConsumeOrderlyContext()
orderlyCtx.MQ = *mq
ctx = primitive.WithOrderlyCtx(ctx, orderlyCtx)
pq.lockConsume.Lock()
result, err := pc.consumeInner(ctx, msgs)
if err != nil {
rlog.Warning("consumeMessage orderly error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
rlog.LogKeyMessages: msgs,
rlog.LogKeyMessageQueue: mq.String(),
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
}
pq.lockConsume.Unlock()
if result == Rollback || result == SuspendCurrentQueueAMoment {
rlog.Warning("consumeMessage Orderly return not OK", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
"messages": msgs,
rlog.LogKeyMessageQueue: mq,
})
}
// just put consumeResult in consumerMessageCtx
//interval = time.Now().Sub(beginTime)
//consumeReult := SuccessReturn
//if interval > pc.option.ConsumeTimeout {
// consumeReult = TimeoutReturn
//} else if SuspendCurrentQueueAMoment == result {
// consumeReult = FailedReturn
//} else if ConsumeSuccess == result {
// consumeReult = SuccessReturn
//}
// process result
commitOffset := int64(-1)
if pc.option.AutoCommit {
switch result {
case Commit, Rollback:
rlog.Warning("the message queue consume result is illegal, we think you want to ack these message: %v", map[string]interface{}{
rlog.LogKeyMessageQueue: mq,
})
case ConsumeSuccess:
commitOffset = pq.commit()
case SuspendCurrentQueueAMoment:
if pc.checkReconsumeTimes(msgs) {
pq.makeMessageToCosumeAgain(msgs...)
time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
continueConsume = false
} else {
commitOffset = pq.commit()
}
default:
}
} else {
switch result {
case ConsumeSuccess:
case Commit:
commitOffset = pq.commit()
case Rollback:
// pq.rollback
time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
continueConsume = false
case SuspendCurrentQueueAMoment:
if pc.checkReconsumeTimes(msgs) {
time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
continueConsume = false
}
default:
}
}
if commitOffset > 0 && !pq.IsDroppd() {
_ = pc.updateOffset(mq, commitOffset)
}
}
} else {
if pq.IsDroppd() {
rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
}
pc.tryLockLaterAndReconsume(mq, 100)
}
}