public GetMessageResponseB2C getMessagesC2B()

in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java [279:422]


    public GetMessageResponseB2C getMessagesC2B(GetMessageRequestC2B request,
            final String rmtAddress,
            boolean overtls) throws Throwable {
        final long startTime = System.currentTimeMillis();
        final GetMessageResponseB2C.Builder builder =
                GetMessageResponseB2C.newBuilder();
        builder.setSuccess(false);
        builder.setCurrOffset(-1);
        builder.setEscFlowCtrl(false);
        builder.setCurrDataDlt(-1);
        builder.setMinLimitTime(0);
        if (!this.started.get()
                || ServiceStatusHolder.isReadServiceStop()) {
            builder.setErrCode(TErrCodeConstants.SERVICE_UNAVAILABLE);
            builder.setErrMsg("Read StoreService temporary unavailable!");
            return builder.build();
        }
        ProcessResult result = new ProcessResult();
        StringBuilder strBuffer = new StringBuilder(512);
        // get and check clientId field
        if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        final String clientId = (String) result.getRetData();
        // get and check groupName field
        if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        final String groupName = (String) result.getRetData();
        // get and check topicName field
        if (!PBParameterUtils.getTopicNameParameter(request.getTopicName(),
                this.metadataManager, strBuffer, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        // get consumer info
        final String topicName = (String) result.getRetData();
        final int partitionId = request.getPartitionId();
        boolean isEscFlowCtrl = request.hasEscFlowCtrl() && request.getEscFlowCtrl();
        String partStr = getPartStr(groupName, topicName, partitionId);
        String consumerId = null;
        ConsumerNodeInfo consumerNodeInfo = consumerRegisterMap.get(partStr);
        if (consumerNodeInfo != null) {
            consumerId = consumerNodeInfo.getConsumerId();
        }
        if (consumerId == null) {
            logger.warn(strBuffer.append("[UnRegistered Consumer]").append(clientId)
                    .append(TokenConstants.SEGMENT_SEP).append(partStr).toString());
            strBuffer.delete(0, strBuffer.length());
            builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
            builder.setErrMsg(strBuffer.append("UnRegistered Consumer:")
                    .append(clientId)
                    .append(", you have to register firstly!").toString());
            return builder.build();
        }
        if (!clientId.equals(consumerId)) {
            strBuffer.append("[Duplicated Request] Partition=").append(partStr)
                    .append(" of Broker=").append(tubeConfig.getBrokerId())
                    .append(" has been consumed by ").append(consumerId)
                    .append(";Current consumer ").append(clientId);
            logger.warn(strBuffer.toString());
            builder.setErrCode(TErrCodeConstants.DUPLICATE_PARTITION);
            builder.setErrMsg(strBuffer.toString());
            return builder.build();
        }
        String rmtAddrInfo = consumerNodeInfo.getRmtAddrInfo();
        try {
            heartbeatManager.updConsumerNode(getHeartbeatNodeId(clientId, partStr));
        } catch (HeartbeatException e) {
            logger.warn(strBuffer.append("[Invalid Request]").append(clientId)
                    .append(TokenConstants.SEGMENT_SEP).append(topicName)
                    .append(TokenConstants.ATTR_SEP).append(partitionId).toString());
            builder.setErrCode(TErrCodeConstants.HB_NO_NODE);
            builder.setErrMsg(e.getMessage());
            return builder.build();
        }
        Integer topicStatusId = this.metadataManager.getClosedTopicStatusId(topicName);
        if ((topicStatusId != null)
                && (topicStatusId > TStatusConstants.STATUS_TOPIC_SOFT_DELETE)) {
            strBuffer.append("[Partition Closed] Partition has been closed, for topic=")
                    .append(topicName).append(",partitionId=").append(partitionId)
                    .append(" of Broker=").append(tubeConfig.getBrokerId());
            logger.warn(strBuffer.toString());
            builder.setErrCode(TErrCodeConstants.FORBIDDEN);
            builder.setErrMsg(strBuffer.toString());
            return builder.build();
        }
        // query data from store manager.
        boolean isGetStore = false;
        MessageStore dataStore = null;
        try {
            dataStore = this.storeManager.getOrCreateMessageStore(topicName, partitionId);
            isGetStore = true;
            GetMessageResult msgResult =
                    getMessages(dataStore, consumerNodeInfo, groupName, topicName, partitionId,
                            request.getLastPackConsumed(), request.getManualCommitOffset(),
                            clientId, this.tubeConfig.getHostName(), rmtAddrInfo, isEscFlowCtrl, strBuffer);
            if (msgResult.isSuccess) {
                long endTime = System.currentTimeMillis();
                consumerNodeInfo.setLastProcInfo(endTime,
                        msgResult.lastRdDataOffset, msgResult.totalMsgSize);
                getCounterGroup.add(msgResult.tmpCounters);
                AuditUtils.addConsumeRecord(msgResult.tmpCounters);
                builder.setEscFlowCtrl(false);
                builder.setRequireSlow(msgResult.isSlowFreq);
                builder.setSuccess(true);
                builder.setErrCode(TErrCodeConstants.SUCCESS);
                builder.setCurrOffset(msgResult.reqOffset);
                builder.setCurrDataDlt(msgResult.waitTime);
                builder.setErrMsg("OK!");
                builder.addAllMessages(msgResult.transferedMessageList);
                builder.setMaxOffset(msgResult.getMaxOffset());
                BrokerSrvStatsHolder.updGetMsgLatency(endTime - startTime);
                return builder.build();
            } else {
                builder.setErrCode(msgResult.getRetCode());
                builder.setErrMsg(msgResult.getErrInfo());
                builder.setMinLimitTime((int) msgResult.waitTime);
                return builder.build();
            }
        } catch (Throwable ee) {
            strBuffer.delete(0, strBuffer.length());
            builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
            if (isGetStore) {
                strBuffer.append("[GetMessage] Throwable error while getMessage,")
                        .append(ee.getMessage()).append(", position is")
                        .append(this.tubeConfig.getBrokerId())
                        .append(TokenConstants.ATTR_SEP).append(topicName)
                        .append(TokenConstants.ATTR_SEP).append(partitionId);
                logger.error(strBuffer.toString(), ee);
                builder.setErrMsg(ee.getMessage() == null ? strBuffer.toString() : ee.getMessage());
            } else {
                builder.setErrMsg(strBuffer.append("Get the store of topic ")
                        .append(topicName).append(" in partition ")
                        .append(partitionId).append(" failure!").toString());
            }
            return builder.build();
        }
    }