public HeartResponseM2C consumerHeartbeatC2M()

in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java [720:876]


    public HeartResponseM2C consumerHeartbeatC2M(HeartRequestC2M request,
            final String rmtAddress,
            boolean overtls) throws Throwable {
        // #lizard forgives
        ProcessResult result = new ProcessResult();
        final StringBuilder strBuff = new StringBuilder(512);
        // response
        HeartResponseM2C.Builder builder = HeartResponseM2C.newBuilder();
        builder.setSuccess(false);
        // identity valid
        if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        final CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData();
        if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        final String clientId = (String) result.getRetData();
        if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        final String groupName = (String) result.getRetData();
        checkNodeStatus(clientId, strBuff);
        ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(groupName);
        if (consumeGroupInfo == null) {
            builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
            builder.setErrMsg(strBuff.append("Not found groupName ")
                    .append(groupName).append(" in holder!").toString());
            return builder.build();
        }
        // authorize check
        if (!serverAuthHandler.validConsumerAuthorizeInfo(certifiedInfo.getUserName(),
                groupName, consumeGroupInfo.getTopicSet(),
                consumeGroupInfo.getTopicConditions(), rmtAddress, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        // heartbeat check
        try {
            heartbeatManager.updConsumerNode(getConsumerKey(groupName, clientId));
        } catch (HeartbeatException e) {
            builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
            builder.setErrMsg(strBuff
                    .append("Update consumer node exception:")
                    .append(e.getMessage()).toString());
            return builder.build();
        }
        //
        Map<String, Map<String, Partition>> topicPartSubList =
                currentSubInfo.get(clientId);
        if (topicPartSubList == null) {
            topicPartSubList = new HashMap<>();
            Map<String, Map<String, Partition>> tmpTopicPartSubList =
                    currentSubInfo.putIfAbsent(clientId, topicPartSubList);
            if (tmpTopicPartSubList != null) {
                topicPartSubList = tmpTopicPartSubList;
            }
        }
        long rebalanceId = request.hasEvent()
                ? request.getEvent().getRebalanceId()
                : TBaseConstants.META_VALUE_UNDEFINED;
        List<String> strSubInfoList = request.getSubscribeInfoList();
        if (request.getReportSubscribeInfo()) {
            List<SubscribeInfo> infoList = DataConverterUtil.convertSubInfo(strSubInfoList);
            if (!checkIfConsist(topicPartSubList, infoList)) {
                topicPartSubList.clear();
                for (SubscribeInfo info : infoList) {
                    Map<String, Partition> partMap =
                            topicPartSubList.get(info.getTopic());
                    if (partMap == null) {
                        partMap = new HashMap<>();
                        topicPartSubList.put(info.getTopic(), partMap);
                    }
                    Partition regPart =
                            new Partition(info.getPartition().getBroker(),
                                    info.getTopic(), info.getPartitionId());
                    partMap.put(regPart.getPartitionKey(), regPart);
                }
                if (rebalanceId <= 0) {
                    logger.warn(strBuff.append("[Consistent Warn]").append(clientId)
                            .append(" sub info is not consistent with master.").toString());
                    strBuff.delete(0, strBuff.length());
                }
            }
        }
        //
        if (rebalanceId > 0) {
            logger.info(strBuff.append("[Event Processed] rebalanceId=")
                    .append(request.getEvent().getRebalanceId())
                    .append(", clientId=").append(clientId).toString());
            strBuff.delete(0, strBuff.length());
            try {
                consumeGroupInfo.settAllocated();
                consumerEventManager.removeFirst(clientId, strBuff);
            } catch (Throwable e) {
                logger.warn("Unknown exception for remove first event:", e);
            }
        }
        //
        ConsumerEvent event =
                consumerEventManager.peek(clientId);
        if (event != null
                && event.getStatus() != EventStatus.PROCESSING) {
            event.setStatus(EventStatus.PROCESSING);
            strBuff.append("[Push Consumer Event]");
            logger.info(event.toStrBuilder(clientId, strBuff).toString());
            strBuff.delete(0, strBuff.length());
            EventProto.Builder eventProtoBuilder =
                    EventProto.newBuilder();
            eventProtoBuilder.setRebalanceId(event.getRebalanceId());
            eventProtoBuilder.setOpType(event.getType().getValue());
            eventProtoBuilder.addAllSubscribeInfo(
                    DataConverterUtil.formatSubInfo(event.getSubscribeInfoList()));
            EventProto eventProto = eventProtoBuilder.build();
            builder.setEvent(eventProto);
        }
        if (request.hasDefFlowCheckId()
                || request.hasGroupFlowCheckId()) {
            builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
            builder.setQryPriorityId(TBaseConstants.META_VALUE_UNDEFINED);
            builder.setDefFlowCheckId(TBaseConstants.META_VALUE_UNDEFINED);
            builder.setGroupFlowCheckId(TBaseConstants.META_VALUE_UNDEFINED);
            builder.setDefFlowControlInfo(" ");
            builder.setGroupFlowControlInfo(" ");
            ClusterSettingEntity defSetting =
                    defMetaDataService.getClusterDefSetting(false);
            GroupResCtrlEntity groupResCtrlConf =
                    defMetaDataService.getGroupCtrlConf(groupName);
            if (defSetting.enableFlowCtrl()) {
                builder.setDefFlowCheckId(defSetting.getSerialId());
                if (request.getDefFlowCheckId() != defSetting.getSerialId()) {
                    builder.setDefFlowControlInfo(defSetting.getGloFlowCtrlRuleInfo());
                }
            }
            if (groupResCtrlConf != null
                    && groupResCtrlConf.isFlowCtrlEnable()) {
                builder.setGroupFlowCheckId(groupResCtrlConf.getSerialId());
                builder.setQryPriorityId(groupResCtrlConf.getQryPriorityId());
                if (request.getGroupFlowCheckId() != groupResCtrlConf.getSerialId()) {
                    builder.setGroupFlowControlInfo(groupResCtrlConf.getFlowCtrlInfo());
                }
            }
        }
        builder.setAuthorizedInfo(genAuthorizedInfo(certifiedInfo.getAuthorizedToken(), false));
        builder.setNotAllocated(consumeGroupInfo.isNotAllocate());
        builder.setSuccess(true);
        builder.setErrCode(TErrCodeConstants.SUCCESS);
        builder.setErrMsg("OK!");
        return builder.build();
    }