public SendMessageResponseB2P sendMessageP2B()

in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java [594:707]


    public SendMessageResponseB2P sendMessageP2B(SendMessageRequestP2B request,
            final String rmtAddress,
            boolean overtls) throws Throwable {
        ProcessResult result = new ProcessResult();
        final long startTime = System.currentTimeMillis();
        final StringBuilder strBuffer = new StringBuilder(512);
        SendMessageResponseB2P.Builder builder = SendMessageResponseB2P.newBuilder();
        builder.setSuccess(false);
        if (!this.started.get()
                || ServiceStatusHolder.isWriteServiceStop()) {
            builder.setErrCode(TErrCodeConstants.SERVICE_UNAVAILABLE);
            builder.setErrMsg("Write StoreService temporary unavailable!");
            return builder.build();
        }
        if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), true, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        final CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData();
        // get and check clientId field
        if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        // get and check topicName and partitionId field
        final int partitionId = request.getPartitionId();
        if (!PBParameterUtils.getTopicNamePartIdInfo(true, request.getTopicName(),
                partitionId, this.metadataManager, strBuffer, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        final TopicMetadata topicMetadata = (TopicMetadata) result.getRetData();
        final String topicName = topicMetadata.getTopic();
        String msgType = null;
        int msgTypeCode = -1;
        if (TStringUtils.isNotBlank(request.getMsgType())) {
            msgType = request.getMsgType().trim();
            msgTypeCode = msgType.hashCode();
        }
        final byte[] msgData = request.getData().toByteArray();
        final int dataLength = msgData.length;
        if (dataLength <= 0) {
            builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
            builder.setErrMsg("data length is zero!");
            return builder.build();
        }
        if (dataLength > topicMetadata.getMaxMsgSize()) {
            builder.setErrCode(TErrCodeConstants.BAD_REQUEST);
            builder.setErrMsg(strBuffer.append("data length over max length, allowed max length is ")
                    .append(topicMetadata.getMaxMsgSize())
                    .append(", data length is ").append(dataLength).toString());
            return builder.build();
        }
        int checkSum = CheckSum.crc32(msgData);
        if (request.getCheckSum() != -1 && checkSum != request.getCheckSum()) {
            builder.setErrCode(TErrCodeConstants.FORBIDDEN);
            builder.setErrMsg(strBuffer.append("Checksum msg data failure: ")
                    .append(request.getCheckSum()).append(" of ").append(topicName)
                    .append(" not equal to the data's checksum of ")
                    .append(checkSum).toString());
            return builder.build();
        }
        if (!serverAuthHandler.validProduceAuthorizeInfo(
                certifiedInfo.getUserName(), topicName, msgType, rmtAddress, result)) {
            builder.setErrCode(result.getErrCode());
            builder.setErrMsg(result.getErrMsg());
            return builder.build();
        }
        try {
            final MessageStore store =
                    this.storeManager.getOrCreateMessageStore(topicName, partitionId);
            final AppendResult appendResult = new AppendResult();
            if (store.appendMsg(appendResult, dataLength, checkSum, msgData,
                    msgTypeCode, request.getFlag(), partitionId, request.getSentAddr())) {
                String baseKey = strBuffer.append(topicName)
                        .append("#").append(AddressUtils.intToIp(request.getSentAddr()))
                        .append("#").append(tubeConfig.getHostName())
                        .append("#").append(request.getPartitionId())
                        .append("#").append(request.getMsgTime()).toString();
                putCounterGroup.add(baseKey, 1L, dataLength);
                AuditUtils.addProduceRecord(topicName,
                        request.getMsgType(), request.getMsgTime(), 1, dataLength);
                builder.setSuccess(true);
                builder.setRequireAuth(certifiedInfo.isReAuth());
                builder.setErrCode(TErrCodeConstants.SUCCESS);
                // begin Deprecated, after 1.0, the ErrMsg set "Ok" or ""
                builder.setErrMsg(String.valueOf(appendResult.getMsgId()));
                // end Deprecated, after 1.0, the ErrMsg set "Ok" or ""
                builder.setMessageId(appendResult.getMsgId());
                builder.setAppendTime(appendResult.getAppendTime());
                builder.setAppendOffset(appendResult.getAppendIndexOffset());
                BrokerSrvStatsHolder.updSendMsgLatency(System.currentTimeMillis() - startTime);
                return builder.build();
            } else {
                builder.setErrCode(TErrCodeConstants.SERVER_RECEIVE_OVERFLOW);
                builder.setErrMsg(strBuffer.append("Put message failed from ")
                        .append(tubeConfig.getHostName())
                        .append(", server receive message overflow!").toString());
                return builder.build();
            }
        } catch (final Throwable ex) {
            logger.error("Put message failed ", ex);
            strBuffer.delete(0, strBuffer.length());
            builder.setSuccess(false);
            builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR);
            builder.setErrMsg(strBuffer.append("Put message failed from ")
                    .append(tubeConfig.getHostName()).append(" ")
                    .append((ex.getMessage() != null ? ex.getMessage() : " ")).toString());
            return builder.build();
        }
    }