private boolean validConsumerInfo()

in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java [628:758]


    private boolean validConsumerInfo(ConsumerInfo inConsumer,
            StringBuilder strBuff, ProcessResult result) {
        // check whether the consumer behavior is consistent
        if (inConsumer.getConsumeType() != this.consumeType) {
            strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
                    .append(" using ").append(inConsumer.getConsumeType().getName())
                    .append(" subscribe is inconsistency with other consumers using ")
                    .append(this.consumeType.getName())
                    .append(" subscribe in the group");
            result.setFailResult(
                    TErrCodeConstants.CLIENT_INCONSISTENT_CONSUMETYPE, strBuff.toString());
            logger.warn(strBuff.toString());
            strBuff.delete(0, strBuff.length());
            return false;
        }
        // check the topics of consumption
        if (CollectionUtils.isNotEmpty(topicSet)
                && (topicSet.size() != inConsumer.getTopicSet().size()
                        || !topicSet.containsAll(inConsumer.getTopicSet()))) {
            strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
                    .append(" subscribed topics ").append(inConsumer.getTopicSet())
                    .append(" is inconsistency with other consumers in the group, existedTopics: ")
                    .append(topicSet);
            result.setFailResult(
                    TErrCodeConstants.CLIENT_INCONSISTENT_TOPICSET, strBuff.toString());
            logger.warn(strBuff.toString());
            strBuff.delete(0, strBuff.length());
            return false;
        }
        // check the topic conditions of consumption
        boolean isCondEqual = true;
        if (topicConditions.isEmpty()) {
            if (!inConsumer.getTopicConditions().isEmpty()) {
                isCondEqual = false;
                strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
                        .append(" subscribe with filter condition ")
                        .append(inConsumer.getTopicConditions())
                        .append(" is inconsistency with other consumers in the group: topic without conditions");
            }
        } else {
            // check the filter conditions of the topic
            if (inConsumer.getTopicConditions().isEmpty()) {
                isCondEqual = false;
                strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
                        .append(" subscribe without filter condition ")
                        .append(" is inconsistency with other consumers in the group, existed topic conditions is ")
                        .append(topicConditions);
            } else {
                Set<String> existedCondTopics = topicConditions.keySet();
                Set<String> reqCondTopics = inConsumer.getTopicConditions().keySet();
                if (existedCondTopics.size() != reqCondTopics.size()
                        || !existedCondTopics.containsAll(reqCondTopics)) {
                    isCondEqual = false;
                    strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
                            .append(" subscribe with filter condition ")
                            .append(inConsumer.getTopicConditions())
                            .append(" is inconsistency with other consumers in the group, existed topic conditions is ")
                            .append(topicConditions);
                } else {
                    for (String topicKey : existedCondTopics) {
                        if ((topicConditions.get(topicKey).size() != inConsumer.getTopicConditions().get(topicKey)
                                .size())
                                || (!topicConditions.get(topicKey).containsAll(
                                        inConsumer.getTopicConditions().get(topicKey)))) {
                            isCondEqual = false;
                            strBuff.append("[Inconsistency subscribe] ")
                                    .append(inConsumer.getConsumerId())
                                    .append(" subscribe with filter condition ")
                                    .append(inConsumer.getTopicConditions())
                                    .append(" is inconsistency with other consumers in the group,")
                                    .append(" existed topic conditions is ")
                                    .append(topicConditions);
                            break;
                        }
                    }
                }
            }
        }
        if (!isCondEqual) {
            result.setFailResult(
                    TErrCodeConstants.CLIENT_INCONSISTENT_FILTERSET, strBuff.toString());
            logger.warn(strBuff.toString());
            return false;
        }
        // Check the validity of bound consumer's parameters
        if (this.consumeType == ConsumeType.CONSUME_BAND) {
            if (!validBoundParameters(inConsumer, strBuff, result)) {
                return false;
            }
        } else if (this.consumeType == ConsumeType.CONSUME_CLIENT_REB) {
            if (this.sourceCount > 0) {
                if (this.sourceCount != inConsumer.getSourceCount()) {
                    strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
                            .append("'s sourceCount is inconsistency with other consumers in the group, required is ")
                            .append(sourceCount).append(", request is ").append(inConsumer.getSourceCount());
                    result.setFailResult(
                            TErrCodeConstants.CLIENT_INCONSISTENT_SOURCECOUNT, strBuff.toString());
                    logger.warn(strBuff.toString());
                    return false;
                }
                boolean foundOccupied = false;
                int occupiedNodeId = -1;
                String occupiedConsumerId = null;
                for (ConsumerInfo consumerInfo : consumerInfoMap.values()) {
                    if (consumerInfo == null) {
                        continue;
                    }
                    if (consumerInfo.getNodeId() == inConsumer.getNodeId()
                            && !consumerInfo.getConsumerId().equals(inConsumer.getConsumerId())) {
                        foundOccupied = true;
                        occupiedNodeId = consumerInfo.getNodeId();
                        occupiedConsumerId = consumerInfo.getConsumerId();
                        break;
                    }
                }
                if (foundOccupied) {
                    strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
                            .append("'s nodeId value(").append(inConsumer.getNodeId())
                            .append(") is occupied by ").append(occupiedConsumerId)
                            .append("'s nodeId value(").append(occupiedNodeId)
                            .append(") in the group!");
                    result.setFailResult(
                            TErrCodeConstants.CLIENT_DUPLICATE_INDEXID, strBuff.toString());
                    logger.warn(strBuff.toString());
                    return false;
                }
            }
        }
        result.setSuccResult("Ok");
        return true;
    }