func()

in consumer/consumer.go [558:602]


func (dc *defaultConsumer) lockAll() {
	mqMapSet := dc.buildProcessQueueTableByBrokerName()
	for broker, mqs := range mqMapSet {
		if len(mqs) == 0 {
			continue
		}
		brokerResult := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(broker, internal.MasterId, true)
		if brokerResult == nil {
			continue
		}
		body := &lockBatchRequestBody{
			ConsumerGroup: dc.consumerGroup,
			ClientId:      dc.client.ClientID(),
			MQs:           mqs,
		}
		lockedMQ := dc.doLock(brokerResult.BrokerAddr, body)
		set := make(map[primitive.MessageQueue]bool)
		for idx := range lockedMQ {
			_mq := lockedMQ[idx]
			v, exist := dc.processQueueTable.Load(_mq)
			if exist {
				pq := v.(*processQueue)
				pq.WithLock(true)
				pq.UpdateLastConsumeTime()
			}
			set[_mq] = true
		}
		for idx := range mqs {
			_mq := mqs[idx]
			if !set[*_mq] {
				v, exist := dc.processQueueTable.Load(_mq)
				if exist {
					pq := v.(*processQueue)
					pq.WithLock(false)
					pq.UpdateLastLockTime()
					rlog.Info("lock MessageQueue", map[string]interface{}{
						"lockOK":                 false,
						rlog.LogKeyConsumerGroup: dc.consumerGroup,
						rlog.LogKeyMessageQueue:  _mq.String(),
					})
				}
			}
		}
	}
}