protected FetchContext fetchMessage()

in inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java [1201:1370]


    protected FetchContext fetchMessage(PartitionSelectResult partSelectResult,
            final StringBuilder strBuffer) {
        // Fetch task context based on selected partition
        FetchContext taskContext =
                new FetchContext(partSelectResult);
        Partition partition = taskContext.getPartition();
        String topic = partition.getTopic();
        String partitionKey = partition.getPartitionKey();
        long startTime = System.currentTimeMillis();
        // Response from broker
        ClientBroker.GetMessageResponseB2C msgRspB2C = null;
        try {
            msgRspB2C =
                    getBrokerService(partition.getBroker())
                            .getMessagesC2B(createBrokerGetMessageRequest(
                                    partition, taskContext.isLastConsumed()),
                                    AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
        } catch (Throwable ee) {
            clientStatsInfo.bookFailRpcCall(TErrCodeConstants.UNSPECIFIED_ABNORMAL);
            // Process the exception
            rmtDataCache.errReqRelease(partitionKey, taskContext.getUsedToken(), false);
            taskContext.setFailProcessResult(400, strBuffer
                    .append("Get message error, reason is ")
                    .append(ee.toString()).toString());
            strBuffer.delete(0, strBuffer.length());
            return taskContext;
        }
        long dltTime = System.currentTimeMillis() - startTime;
        if (msgRspB2C == null) {
            clientStatsInfo.bookFailRpcCall(TErrCodeConstants.INTERNAL_SERVER_ERROR);
            rmtDataCache.errReqRelease(partitionKey, taskContext.getUsedToken(), false);
            taskContext.setFailProcessResult(500, "Get message null");
            return taskContext;
        }
        try {
            // Process the response based on the return code
            switch (msgRspB2C.getErrCode()) {
                case TErrCodeConstants.SUCCESS: {
                    int msgSize = 0;
                    int msgCount = 0;
                    // Convert the message payload data
                    List<Message> tmpMessageList =
                            DataConverterUtil.convertMessage(topic, msgRspB2C.getMessagesList());
                    boolean isEscLimit =
                            (msgRspB2C.hasEscFlowCtrl() && msgRspB2C.getEscFlowCtrl());
                    // Filter the message based on its content
                    // Calculate the message size and do some flow control
                    boolean needFilter = false;
                    Set<String> topicFilterSet = null;
                    TopicProcessor topicProcessor = consumeSubInfo.getTopicProcessor(topic);
                    if (topicProcessor != null) {
                        topicFilterSet = topicProcessor.getFilterConds();
                        if (topicFilterSet != null && !topicFilterSet.isEmpty()) {
                            needFilter = true;
                        }
                    }
                    List<Message> messageList = new ArrayList<>();
                    for (Message message : tmpMessageList) {
                        if (message == null) {
                            continue;
                        }
                        if (needFilter && (TStringUtils.isBlank(message.getMsgType())
                                || !topicFilterSet.contains(message.getMsgType()))) {
                            continue;
                        }
                        msgCount++;
                        messageList.add(message);
                        msgSize += message.getData().length;
                    }
                    // Set the process result of current stage. Process the result based on the response
                    long dataDltVal = msgRspB2C.hasCurrDataDlt()
                            ? msgRspB2C.getCurrDataDlt()
                            : -1;
                    long currOffset = msgRspB2C.hasCurrOffset()
                            ? msgRspB2C.getCurrOffset()
                            : TBaseConstants.META_VALUE_UNDEFINED;
                    long maxOffset = msgRspB2C.hasMaxOffset()
                            ? msgRspB2C.getMaxOffset()
                            : TBaseConstants.META_VALUE_UNDEFINED;
                    boolean isRequireSlow =
                            (msgRspB2C.hasRequireSlow() && msgRspB2C.getRequireSlow());
                    rmtDataCache
                            .setPartitionContextInfo(partitionKey, currOffset, 1,
                                    msgRspB2C.getErrCode(), isEscLimit, msgSize, 0,
                                    dataDltVal, isRequireSlow, maxOffset);
                    taskContext.setSuccessProcessResult(currOffset,
                            strBuffer.append(partitionKey).append(TokenConstants.ATTR_SEP)
                                    .append(taskContext.getUsedToken()).toString(),
                            messageList, maxOffset);
                    strBuffer.delete(0, strBuffer.length());
                    clientStatsInfo.bookSuccGetMsg(dltTime,
                            topic, partitionKey, msgCount, msgSize);
                    break;
                }
                case TErrCodeConstants.HB_NO_NODE:
                case TErrCodeConstants.CERTIFICATE_FAILURE:
                case TErrCodeConstants.DUPLICATE_PARTITION: {
                    // Release the partitions when meeting these error codes
                    removePartition(partition);
                    taskContext.setFailProcessResult(msgRspB2C.getErrCode(), msgRspB2C.getErrMsg());
                    break;
                }
                case TErrCodeConstants.SERVER_CONSUME_SPEED_LIMIT: {
                    // Process with server side speed limit
                    long defDltTime =
                            msgRspB2C.hasMinLimitTime()
                                    ? msgRspB2C.getMinLimitTime()
                                    : consumerConfig.getMsgNotFoundWaitPeriodMs();
                    rmtDataCache.errRspRelease(partitionKey, topic,
                            taskContext.getUsedToken(), false, TBaseConstants.META_VALUE_UNDEFINED,
                            0, msgRspB2C.getErrCode(), false, 0,
                            defDltTime, isFilterConsume(topic), TBaseConstants.META_VALUE_UNDEFINED,
                            TBaseConstants.META_VALUE_UNDEFINED);
                    taskContext.setFailProcessResult(msgRspB2C.getErrCode(), msgRspB2C.getErrMsg());
                    break;
                }
                case TErrCodeConstants.NOT_FOUND:
                case TErrCodeConstants.FORBIDDEN:
                case TErrCodeConstants.SERVICE_UNAVAILABLE:
                case TErrCodeConstants.MOVED:
                default: {
                    // Slow down the request based on the limitation configuration when meet these errors
                    long limitDlt = 300;
                    switch (msgRspB2C.getErrCode()) {
                        case TErrCodeConstants.FORBIDDEN: {
                            limitDlt = 2000;
                            break;
                        }
                        case TErrCodeConstants.SERVICE_UNAVAILABLE: {
                            limitDlt = 300;
                            break;
                        }
                        case TErrCodeConstants.MOVED: {
                            limitDlt = 200;
                            break;
                        }
                        case TErrCodeConstants.NOT_FOUND: {
                            limitDlt = consumerConfig.getMsgNotFoundWaitPeriodMs();
                            break;
                        }
                        default: {
                            //
                        }
                    }
                    rmtDataCache.errRspRelease(partitionKey, topic,
                            taskContext.getUsedToken(), false, TBaseConstants.META_VALUE_UNDEFINED,
                            0, msgRspB2C.getErrCode(), false, 0,
                            limitDlt, isFilterConsume(topic), -1, TBaseConstants.META_VALUE_UNDEFINED);
                    taskContext.setFailProcessResult(msgRspB2C.getErrCode(), msgRspB2C.getErrMsg());
                    break;
                }
            }
            if (msgRspB2C.getErrCode() != TErrCodeConstants.SUCCESS) {
                clientStatsInfo.bookFailRpcCall(msgRspB2C.getErrCode());
            }
            return taskContext;
        } catch (Throwable ee) {
            clientStatsInfo.bookFailRpcCall(TErrCodeConstants.INTERNAL_SERVER_ERROR);
            logger.error("Process response code error", ee);
            rmtDataCache.succRspRelease(partitionKey, topic,
                    taskContext.getUsedToken(), false, isFilterConsume(topic),
                    TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED);
            taskContext.setFailProcessResult(TErrCodeConstants.INTERNAL_SERVER_ERROR,
                    strBuffer.append("Get message failed,topic=")
                            .append(topic).append(",partition=").append(partition)
                            .append(", throw info is ").append(ee.toString()).toString());
            strBuffer.delete(0, strBuffer.length());
        }
        return taskContext;
    }