in client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java [510:695]
private boolean updateMessageQueueAssignment(final String topic, final Set<MessageQueueAssignment> assignments,
final boolean isOrder) {
boolean changed = false;
Map<MessageQueue, MessageQueueAssignment> mq2PushAssignment = new HashMap<>();
Map<MessageQueue, MessageQueueAssignment> mq2PopAssignment = new HashMap<>();
for (MessageQueueAssignment assignment : assignments) {
MessageQueue messageQueue = assignment.getMessageQueue();
if (messageQueue == null) {
continue;
}
if (MessageRequestMode.POP == assignment.getMode()) {
mq2PopAssignment.put(messageQueue, assignment);
} else {
mq2PushAssignment.put(messageQueue, assignment);
}
}
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
if (mq2PopAssignment.isEmpty() && !mq2PushAssignment.isEmpty()) {
//pop switch to push
//subscribe pop retry topic
try {
final String retryTopic = KeyBuilder.buildPopRetryTopic(topic, getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
getSubscriptionInner().put(retryTopic, subscriptionData);
} catch (Exception ignored) {
}
} else if (!mq2PopAssignment.isEmpty() && mq2PushAssignment.isEmpty()) {
//push switch to pop
//unsubscribe pop retry topic
try {
final String retryTopic = KeyBuilder.buildPopRetryTopic(topic, getConsumerGroup());
getSubscriptionInner().remove(retryTopic);
} catch (Exception ignored) {
}
}
}
{
// drop process queues no longer belong me
HashMap<MessageQueue, ProcessQueue> removeQueueMap = new HashMap<>(this.processQueueTable.size());
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mq2PushAssignment.containsKey(mq)) {
pq.setDropped(true);
removeQueueMap.put(mq, pq);
} else if (pq.isPullExpired() && this.consumeType() == ConsumeType.CONSUME_PASSIVELY) {
pq.setDropped(true);
removeQueueMap.put(mq, pq);
log.error("[BUG]doRebalance, {}, try remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
}
}
// remove message queues no longer belong me
for (Entry<MessageQueue, ProcessQueue> entry : removeQueueMap.entrySet()) {
MessageQueue mq = entry.getKey();
ProcessQueue pq = entry.getValue();
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
this.processQueueTable.remove(mq);
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
}
}
{
HashMap<MessageQueue, PopProcessQueue> removeQueueMap = new HashMap<>(this.popProcessQueueTable.size());
Iterator<Entry<MessageQueue, PopProcessQueue>> it = this.popProcessQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, PopProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
PopProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mq2PopAssignment.containsKey(mq)) {
//the queue is no longer your assignment
pq.setDropped(true);
removeQueueMap.put(mq, pq);
} else if (pq.isPullExpired() && this.consumeType() == ConsumeType.CONSUME_PASSIVELY) {
pq.setDropped(true);
removeQueueMap.put(mq, pq);
log.error("[BUG]doRebalance, {}, try remove unnecessary pop mq, {}, because pop is pause, so try to fixed it",
consumerGroup, mq);
}
}
}
// remove message queues no longer belong me
for (Entry<MessageQueue, PopProcessQueue> entry : removeQueueMap.entrySet()) {
MessageQueue mq = entry.getKey();
PopProcessQueue pq = entry.getValue();
if (this.removeUnnecessaryPopMessageQueue(mq, pq)) {
this.popProcessQueueTable.remove(mq);
changed = true;
log.info("doRebalance, {}, remove unnecessary pop mq, {}", consumerGroup, mq);
}
}
}
{
// add new message queue
boolean allMQLocked = true;
List<PullRequest> pullRequestList = new ArrayList<>();
for (MessageQueue mq : mq2PushAssignment.keySet()) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
allMQLocked = false;
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = createProcessQueue();
pq.setLocked(true);
long nextOffset = -1L;
try {
nextOffset = this.computePullFromWhereWithException(mq);
} catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
if (!allMQLocked) {
mQClientFactory.rebalanceLater(500);
}
this.dispatchPullRequest(pullRequestList, 500);
}
{
// add new message queue
List<PopRequest> popRequestList = new ArrayList<>();
for (MessageQueue mq : mq2PopAssignment.keySet()) {
if (!this.popProcessQueueTable.containsKey(mq)) {
PopProcessQueue pq = createPopProcessQueue();
PopProcessQueue pre = this.popProcessQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq pop already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new pop mq, {}", consumerGroup, mq);
PopRequest popRequest = new PopRequest();
popRequest.setTopic(topic);
popRequest.setConsumerGroup(consumerGroup);
popRequest.setMessageQueue(mq);
popRequest.setPopProcessQueue(pq);
popRequest.setInitMode(getConsumeInitMode());
popRequestList.add(popRequest);
changed = true;
}
}
}
this.dispatchPopPullRequest(popRequestList, 500);
}
return changed;
}