in consumer/consumer.go [536:580]
func (dc *defaultConsumer) lockAll() {
mqMapSet := dc.buildProcessQueueTableByBrokerName()
for broker, mqs := range mqMapSet {
if len(mqs) == 0 {
continue
}
brokerResult := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(broker, internal.MasterId, true)
if brokerResult == nil {
continue
}
body := &lockBatchRequestBody{
ConsumerGroup: dc.consumerGroup,
ClientId: dc.client.ClientID(),
MQs: mqs,
}
lockedMQ := dc.doLock(brokerResult.BrokerAddr, body)
set := make(map[primitive.MessageQueue]bool)
for idx := range lockedMQ {
_mq := lockedMQ[idx]
v, exist := dc.processQueueTable.Load(_mq)
if exist {
pq := v.(*processQueue)
pq.WithLock(true)
pq.UpdateLastConsumeTime()
}
set[_mq] = true
}
for idx := range mqs {
_mq := mqs[idx]
if !set[*_mq] {
v, exist := dc.processQueueTable.Load(_mq)
if exist {
pq := v.(*processQueue)
pq.WithLock(false)
pq.UpdateLastLockTime()
rlog.Info("lock MessageQueue", map[string]interface{}{
"lockOK": false,
rlog.LogKeyConsumerGroup: dc.consumerGroup,
rlog.LogKeyMessageQueue: _mq.String(),
})
}
}
}
}
}