public RegisterResponseM2CV2 consumerRegisterC2MV2()

in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java [1193:1350]


    public RegisterResponseM2CV2 consumerRegisterC2MV2(RegisterRequestC2MV2 request,
            String rmtAddress,
            boolean overtls) throws Throwable {
        ProcessResult result = new ProcessResult();
        final StringBuilder strBuff = new StringBuilder(512);
        RegisterResponseM2CV2.Builder builder = RegisterResponseM2CV2.newBuilder();
        if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        final CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData();
        if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        final String consumerId = (String) result.getRetData();
        if (!PBParameterUtils.checkHostName(request.getHostName(), strBuff, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        // final String hostName = (String) result.getRetData();
        if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        final String groupName = (String) result.getRetData();
        // check master current status
        checkNodeStatus(consumerId, strBuff);
        if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(),
                request.getTopicListList(), strBuff, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        final Set<String> reqTopicSet = (Set<String>) result.getRetData();
        final Map<String, TreeSet<String>> reqTopicConditions =
                DataConverterUtil.convertTopicConditions(request.getTopicConditionList());
        int sourceCount = request.getSourceCount();
        int nodeId = request.getNodeId();
        if (sourceCount > 0) {
            if (nodeId < 0 || nodeId > (sourceCount - 1)) {
                builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
                builder.setErrMsg("Request nodeId value must be between in [0, sourceCount-1]!");
                return builder.build();
            }
        }
        final String clientJdkVer = request.hasJdkVersion()
                ? request.getJdkVersion()
                : "";
        final ConsumeType csmType = ConsumeType.CONSUME_CLIENT_REB;
        OpsSyncInfo opsTaskInfo = new OpsSyncInfo();
        if (request.hasOpsTaskInfo()) {
            opsTaskInfo.updOpsSyncInfo(request.getOpsTaskInfo());
        }
        ClientSyncInfo clientSyncInfo = new ClientSyncInfo();
        if (request.hasSubRepInfo()) {
            clientSyncInfo.updSubRepInfo(brokerRunManager, request.getSubRepInfo());
        }
        // build consumer object
        ConsumerInfo inConsumerInfo =
                new ConsumerInfo(consumerId, overtls, groupName, csmType,
                        sourceCount, nodeId, reqTopicSet, reqTopicConditions,
                        opsTaskInfo.getCsmFromMaxOffsetCtrlId(), clientSyncInfo,
                        rmtAddress);
        // need removed for authorize center begin
        if (!this.defMetaDataService
                .isConsumeTargetAuthorized(consumerId, groupName,
                        reqTopicSet, reqTopicConditions, strBuff, result)) {
            if (strBuff.length() > 0) {
                logger.warn(strBuff.toString());
            }
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        // need removed for authorize center end
        // check resource require
        if (!PBParameterUtils.checkConsumerInputInfo(inConsumerInfo,
                masterConfig, defMetaDataService, brokerRunManager, strBuff, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        if (!serverAuthHandler.validConsumerAuthorizeInfo(certifiedInfo.getUserName(),
                groupName, reqTopicSet, reqTopicConditions, rmtAddress, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        Integer lid = null;
        try {
            lid = masterRowLock.getLock(null,
                    StringUtils.getBytesUtf8(consumerId), true);
            if (!consumerHolder.addConsumer(inConsumerInfo, false, strBuff, result)) {
                builder.setErrCode(result.getErrCode());
                builder.setErrMsg(result.getErrMsg());
                return builder.build();
            }
            topicPSInfoManager.addGroupSubTopicInfo(groupName, reqTopicSet);
            heartbeatManager.regConsumerNode(getConsumerKey(groupName, consumerId));
        } catch (IOException e) {
            logger.warn("Failed to lock.", e);
        } finally {
            if (lid != null) {
                this.masterRowLock.releaseRowLock(lid);
            }
        }
        ConsumeGroupInfo consumeGroupInfo =
                consumerHolder.getConsumeGroupInfo(groupName);
        if (consumeGroupInfo == null) {
            logger.warn(strBuff.append("[Illegal Process] ").append(consumerId)
                    .append(" visit consume group(").append(groupName)
                    .append(" info failure, null information").toString());
            builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
            builder.setErrMsg(strBuff.toString());
            strBuff.delete(0, strBuff.length());
            return builder.build();
        }
        inConsumerInfo = consumeGroupInfo.getConsumerInfo(consumerId);
        if (inConsumerInfo == null) {
            logger.warn(strBuff.append("[Illegal Process] ").append(consumerId)
                    .append(" visit consume info failure, null information").toString());
            builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
            builder.setErrMsg(strBuff.toString());
            strBuff.delete(0, strBuff.length());
            return builder.build();
        }
        Map<String, Map<String, Partition>> topicPartSubMap = new HashMap<>();
        currentSubInfo.put(consumerId, topicPartSubMap);
        Tuple2<Boolean, Set<Partition>> reportInfo = clientSyncInfo.getRepSubInfo();
        if (reportInfo.getF0()) {
            for (Partition info : reportInfo.getF1()) {
                Map<String, Partition> partMap =
                        topicPartSubMap.computeIfAbsent(info.getTopic(), k -> new HashMap<>());
                partMap.put(info.getPartitionKey(), info);
            }
            printReportInfo(consumerId, null, topicPartSubMap, strBuff);
        }
        logger.info(strBuff.append("[Consumer Register] ")
                .append(consumerId).append(", isOverTLS=").append(overtls)
                .append(", clientJDKVer=").append(clientJdkVer).toString());
        strBuff.delete(0, strBuff.length());
        Tuple2<Long, Map<Integer, String>> brokerStaticInfo =
                brokerRunManager.getBrokerStaticInfo(overtls);
        builder.setBrokerConfigId(brokerStaticInfo.getF0());
        if (clientSyncInfo.getBrokerConfigId() != brokerStaticInfo.getF0()) {
            builder.addAllBrokerConfigList(brokerStaticInfo.getF1().values());
        }
        builder.setOpsTaskInfo(buildOpsTaskInfo(consumeGroupInfo, inConsumerInfo, opsTaskInfo));
        builder.setAuthorizedInfo(genAuthorizedInfo(certifiedInfo.getAuthorizedToken(), false));
        builder.setErrCode(TErrCodeConstants.SUCCESS);
        builder.setErrMsg("OK!");
        return builder.build();
    }