public RegisterResponseM2C consumerRegisterC2M()

in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java [529:708]


    public RegisterResponseM2C consumerRegisterC2M(RegisterRequestC2M request,
            String rmtAddress,
            boolean overtls) throws Exception {
        // #lizard forgives
        ProcessResult result = new ProcessResult();
        final StringBuilder strBuff = new StringBuilder(512);
        RegisterResponseM2C.Builder builder = RegisterResponseM2C.newBuilder();
        builder.setSuccess(false);
        if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        final CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData();
        if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        final String consumerId = (String) result.getRetData();
        if (!PBParameterUtils.checkHostName(request.getHostName(), strBuff, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        // final String hostName = (String) result.getRetData();
        if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        final String groupName = (String) result.getRetData();
        checkNodeStatus(consumerId, strBuff);
        if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(),
                request.getTopicListList(), strBuff, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        final Set<String> reqTopicSet = (Set<String>) result.getRetData();
        final Map<String, TreeSet<String>> reqTopicConditions =
                DataConverterUtil.convertTopicConditions(request.getTopicConditionList());
        String requiredParts = request.hasRequiredPartition() ? request.getRequiredPartition() : "";
        ConsumeType csmType = (request.hasRequireBound() && request.getRequireBound())
                ? ConsumeType.CONSUME_BAND
                : ConsumeType.CONSUME_NORMAL;
        final String clientJdkVer = request.hasJdkVersion() ? request.getJdkVersion() : "";
        if (!PBParameterUtils.checkConsumerOffsetSetInfo(
                csmType, reqTopicSet, requiredParts, strBuff, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        Map<String, Long> requiredPartMap = (Map<String, Long>) result.getRetData();
        String sessionKey = request.hasSessionKey() ? request.getSessionKey() : "";
        long sessionTime = request.hasSessionTime()
                ? request.getSessionTime()
                : System.currentTimeMillis();
        int sourceCount = request.hasTotalCount()
                ? request.getTotalCount()
                : -1;
        int qryPriorityId = request.hasQryPriorityId()
                ? request.getQryPriorityId()
                : TBaseConstants.META_VALUE_UNDEFINED;
        List<SubscribeInfo> subscribeList =
                DataConverterUtil.convertSubInfo(request.getSubscribeInfoList());
        boolean isNotAllocated = true;
        if (CollectionUtils.isNotEmpty(subscribeList)
                || ((request.hasNotAllocated() && !request.getNotAllocated()))) {
            isNotAllocated = false;
        }
        boolean isSelectBig = (!request.hasSelectBig() || request.getSelectBig());
        // build consumer object
        ConsumerInfo inConsumerInfo =
                new ConsumerInfo(consumerId, overtls, groupName,
                        reqTopicSet, reqTopicConditions, csmType,
                        sessionKey, sessionTime, sourceCount,
                        isSelectBig, requiredPartMap, rmtAddress);
        if (!PBParameterUtils.checkConsumerInputInfo(inConsumerInfo,
                masterConfig, defMetaDataService, brokerRunManager, strBuff, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        ConsumerInfo inConsumerInfo2 = (ConsumerInfo) result.getRetData();
        if (!serverAuthHandler.validConsumerAuthorizeInfo(certifiedInfo.getUserName(),
                groupName, reqTopicSet, reqTopicConditions, rmtAddress, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        // need removed for authorize center begin
        if (!this.defMetaDataService
                .isConsumeTargetAuthorized(consumerId, groupName,
                        reqTopicSet, reqTopicConditions, strBuff, result)) {
            if (strBuff.length() > 0) {
                logger.warn(strBuff.toString());
            }
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        // need removed for authorize center end
        Integer lid = null;
        ConsumeGroupInfo consumeGroupInfo = null;
        try {
            lid = masterRowLock.getLock(null, StringUtils.getBytesUtf8(consumerId), true);
            if (!consumerHolder.addConsumer(inConsumerInfo2,
                    isNotAllocated, strBuff, result)) {
                builder.setErrCode(result.getErrCode());
                builder.setErrMsg(result.getErrMsg());
                return builder.build();
            }
            consumeGroupInfo = (ConsumeGroupInfo) result.getRetData();
            topicPSInfoManager.addGroupSubTopicInfo(groupName, reqTopicSet);
            if (CollectionUtils.isNotEmpty(subscribeList)) {
                int reportCnt = 0;
                Map<String, Partition> partMap;
                Map<String, Map<String, Partition>> topicPartSubMap = new HashMap<>();
                currentSubInfo.put(consumerId, topicPartSubMap);
                strBuff.append("[SubInfo Report] client=").append(consumerId)
                        .append(", subscribed partitions=[");
                for (SubscribeInfo info : subscribeList) {
                    partMap = topicPartSubMap.computeIfAbsent(
                            info.getTopic(), k -> new HashMap<>());
                    partMap.put(info.getPartition().getPartitionKey(), info.getPartition());
                    if (reportCnt++ > 0) {
                        strBuff.append(",");
                    }
                    strBuff.append(info.getPartitionStr());
                }
                strBuff.append("]");
                logger.info(strBuff.toString());
                strBuff.delete(0, strBuff.length());
            }
            heartbeatManager.regConsumerNode(getConsumerKey(groupName, consumerId));
        } catch (IOException e) {
            logger.warn("Failed to lock.", e);
        } finally {
            if (lid != null) {
                this.masterRowLock.releaseRowLock(lid);
            }
        }
        logger.info(strBuff.append("[Consumer Register] ")
                .append(consumerId).append(", isOverTLS=").append(overtls)
                .append(", clientJDKVer=").append(clientJdkVer).toString());
        strBuff.delete(0, strBuff.length());
        if (request.hasDefFlowCheckId() || request.hasGroupFlowCheckId()) {
            builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
            builder.setDefFlowCheckId(TBaseConstants.META_VALUE_UNDEFINED);
            builder.setGroupFlowCheckId(TBaseConstants.META_VALUE_UNDEFINED);
            builder.setQryPriorityId(TBaseConstants.META_VALUE_UNDEFINED);
            builder.setDefFlowControlInfo(" ");
            builder.setGroupFlowControlInfo(" ");
            ClusterSettingEntity defSetting =
                    defMetaDataService.getClusterDefSetting(false);
            GroupResCtrlEntity groupResCtrlConf =
                    defMetaDataService.getGroupCtrlConf(groupName);
            if (defSetting.enableFlowCtrl()) {
                builder.setDefFlowCheckId(defSetting.getSerialId());
                if (request.getDefFlowCheckId() != defSetting.getSerialId()) {
                    builder.setDefFlowControlInfo(defSetting.getGloFlowCtrlRuleInfo());
                }
            }
            if (groupResCtrlConf != null
                    && groupResCtrlConf.isFlowCtrlEnable()) {
                builder.setGroupFlowCheckId(groupResCtrlConf.getSerialId());
                builder.setQryPriorityId(groupResCtrlConf.getQryPriorityId());
                if (request.getGroupFlowCheckId() != groupResCtrlConf.getSerialId()) {
                    builder.setGroupFlowControlInfo(groupResCtrlConf.getFlowCtrlInfo());
                }
            }
        }
        builder.setAuthorizedInfo(genAuthorizedInfo(certifiedInfo.getAuthorizedToken(), false));
        builder.setNotAllocated(consumeGroupInfo.isNotAllocate());
        builder.setSuccess(true);
        builder.setErrCode(TErrCodeConstants.SUCCESS);
        builder.setErrMsg("OK!");
        return builder.build();
    }