in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java [628:758]
private boolean validConsumerInfo(ConsumerInfo inConsumer,
StringBuilder strBuff, ProcessResult result) {
// check whether the consumer behavior is consistent
if (inConsumer.getConsumeType() != this.consumeType) {
strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append(" using ").append(inConsumer.getConsumeType().getName())
.append(" subscribe is inconsistency with other consumers using ")
.append(this.consumeType.getName())
.append(" subscribe in the group");
result.setFailResult(
TErrCodeConstants.CLIENT_INCONSISTENT_CONSUMETYPE, strBuff.toString());
logger.warn(strBuff.toString());
strBuff.delete(0, strBuff.length());
return false;
}
// check the topics of consumption
if (CollectionUtils.isNotEmpty(topicSet)
&& (topicSet.size() != inConsumer.getTopicSet().size()
|| !topicSet.containsAll(inConsumer.getTopicSet()))) {
strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append(" subscribed topics ").append(inConsumer.getTopicSet())
.append(" is inconsistency with other consumers in the group, existedTopics: ")
.append(topicSet);
result.setFailResult(
TErrCodeConstants.CLIENT_INCONSISTENT_TOPICSET, strBuff.toString());
logger.warn(strBuff.toString());
strBuff.delete(0, strBuff.length());
return false;
}
// check the topic conditions of consumption
boolean isCondEqual = true;
if (topicConditions.isEmpty()) {
if (!inConsumer.getTopicConditions().isEmpty()) {
isCondEqual = false;
strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append(" subscribe with filter condition ")
.append(inConsumer.getTopicConditions())
.append(" is inconsistency with other consumers in the group: topic without conditions");
}
} else {
// check the filter conditions of the topic
if (inConsumer.getTopicConditions().isEmpty()) {
isCondEqual = false;
strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append(" subscribe without filter condition ")
.append(" is inconsistency with other consumers in the group, existed topic conditions is ")
.append(topicConditions);
} else {
Set<String> existedCondTopics = topicConditions.keySet();
Set<String> reqCondTopics = inConsumer.getTopicConditions().keySet();
if (existedCondTopics.size() != reqCondTopics.size()
|| !existedCondTopics.containsAll(reqCondTopics)) {
isCondEqual = false;
strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append(" subscribe with filter condition ")
.append(inConsumer.getTopicConditions())
.append(" is inconsistency with other consumers in the group, existed topic conditions is ")
.append(topicConditions);
} else {
for (String topicKey : existedCondTopics) {
if ((topicConditions.get(topicKey).size() != inConsumer.getTopicConditions().get(topicKey)
.size())
|| (!topicConditions.get(topicKey).containsAll(
inConsumer.getTopicConditions().get(topicKey)))) {
isCondEqual = false;
strBuff.append("[Inconsistency subscribe] ")
.append(inConsumer.getConsumerId())
.append(" subscribe with filter condition ")
.append(inConsumer.getTopicConditions())
.append(" is inconsistency with other consumers in the group,")
.append(" existed topic conditions is ")
.append(topicConditions);
break;
}
}
}
}
}
if (!isCondEqual) {
result.setFailResult(
TErrCodeConstants.CLIENT_INCONSISTENT_FILTERSET, strBuff.toString());
logger.warn(strBuff.toString());
return false;
}
// Check the validity of bound consumer's parameters
if (this.consumeType == ConsumeType.CONSUME_BAND) {
if (!validBoundParameters(inConsumer, strBuff, result)) {
return false;
}
} else if (this.consumeType == ConsumeType.CONSUME_CLIENT_REB) {
if (this.sourceCount > 0) {
if (this.sourceCount != inConsumer.getSourceCount()) {
strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append("'s sourceCount is inconsistency with other consumers in the group, required is ")
.append(sourceCount).append(", request is ").append(inConsumer.getSourceCount());
result.setFailResult(
TErrCodeConstants.CLIENT_INCONSISTENT_SOURCECOUNT, strBuff.toString());
logger.warn(strBuff.toString());
return false;
}
boolean foundOccupied = false;
int occupiedNodeId = -1;
String occupiedConsumerId = null;
for (ConsumerInfo consumerInfo : consumerInfoMap.values()) {
if (consumerInfo == null) {
continue;
}
if (consumerInfo.getNodeId() == inConsumer.getNodeId()
&& !consumerInfo.getConsumerId().equals(inConsumer.getConsumerId())) {
foundOccupied = true;
occupiedNodeId = consumerInfo.getNodeId();
occupiedConsumerId = consumerInfo.getConsumerId();
break;
}
}
if (foundOccupied) {
strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append("'s nodeId value(").append(inConsumer.getNodeId())
.append(") is occupied by ").append(occupiedConsumerId)
.append("'s nodeId value(").append(occupiedNodeId)
.append(") in the group!");
result.setFailResult(
TErrCodeConstants.CLIENT_DUPLICATE_INDEXID, strBuff.toString());
logger.warn(strBuff.toString());
return false;
}
}
}
result.setSuccResult("Ok");
return true;
}