public void run()

in inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java [1581:1705]


        public void run() {
            StringBuilder strBuffer = new StringBuilder(256);
            while (!isShutdown()) {
                try {
                    // First check the last heartbeat interval. If it's larger than two periods,
                    // there may be some system hang up(e.g. long time gc, CPU is too busy).
                    // Print the warning message.
                    long currentTime = System.currentTimeMillis();
                    if ((currentTime - lastHeartbeatTime2Broker) > (consumerConfig.getHeartbeatPeriodMs() * 2)) {
                        logger.warn(strBuffer.append(consumerId)
                                .append(" heartbeat interval to broker is too long,please check! Total time : ")
                                .append(currentTime - lastHeartbeatTime2Broker).toString());
                        strBuffer.delete(0, strBuffer.length());
                    }
                    // Send heartbeat request to the broker connect by the client
                    for (BrokerInfo brokerInfo : rmtDataCache.getAllRegisterBrokers()) {
                        List<String> partStrSet = new ArrayList<String>();
                        try {
                            // Handle the heartbeat response for partitions belong to the same broker.
                            List<Partition> partitions =
                                    rmtDataCache.getBrokerPartitionList(brokerInfo);
                            if ((partitions != null) && (!partitions.isEmpty())) {
                                for (Partition partition : partitions) {
                                    partStrSet.add(partition.toString());
                                }
                                ClientBroker.HeartBeatResponseB2C heartBeatResponseV2 =
                                        getBrokerService(brokerInfo).consumerHeartbeatC2B(
                                                createBrokerHeartBeatRequest(brokerInfo.getBrokerId(), partStrSet),
                                                AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
                                // When response is success
                                if (heartBeatResponseV2 == null) {
                                    clientStatsInfo.bookHB2BrokerTimeout();
                                    continue;
                                }
                                if (heartBeatResponseV2.getSuccess()) {
                                    // If the peer require authentication, set a flag.
                                    // The following request will attach the auth information.
                                    rmtDataCache.bookBrokerRequireAuthInfo(
                                            brokerInfo.getBrokerId(), heartBeatResponseV2);
                                    // If the heartbeat response report failed partitions, release the
                                    // corresponding local partition and log the operation
                                    if (heartBeatResponseV2.getHasPartFailure()) {
                                        try {
                                            List<String> strFailInfoList =
                                                    heartBeatResponseV2.getFailureInfoList();
                                            for (String strFailInfo : strFailInfoList) {
                                                final int index =
                                                        strFailInfo.indexOf(TokenConstants.ATTR_SEP);
                                                if (index < 0) {
                                                    logger.error(strBuffer
                                                            .append("Parse Heartbeat response error : ")
                                                            .append("invalid response, ")
                                                            .append(strFailInfo).toString());
                                                    strBuffer.delete(0, strBuffer.length());
                                                    continue;
                                                }
                                                int errorCode =
                                                        Integer.parseInt(strFailInfo.substring(0, index));
                                                Partition failPartition =
                                                        new Partition(strFailInfo.substring(index + 1));
                                                removePartition(failPartition);
                                                logger.warn(strBuffer
                                                        .append("[heart2broker error] partition:")
                                                        .append(failPartition.toString())
                                                        .append(", errorCode=")
                                                        .append(errorCode).toString());
                                                strBuffer.delete(0, strBuffer.length());
                                            }
                                        } catch (Throwable ee) {
                                            if (!isShutdown()) {
                                                strBuffer.delete(0, strBuffer.length());
                                                logger.error(strBuffer
                                                        .append("Parse Heartbeat response error :")
                                                        .append(ee.getMessage()).toString());
                                                strBuffer.delete(0, strBuffer.length());
                                            }
                                        }
                                    }
                                } else {
                                    clientStatsInfo.bookHB2BrokerException();
                                    if (heartBeatResponseV2.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
                                        for (Partition partition : partitions) {
                                            removePartition(partition);
                                        }
                                        logger.warn(strBuffer
                                                .append("[heart2broker error] certificate failure, ")
                                                .append(brokerInfo.getBrokerStrInfo())
                                                .append("'s partitions area released, ")
                                                .append(heartBeatResponseV2.getErrMsg()).toString());
                                        strBuffer.delete(0, strBuffer.length());
                                    }
                                }
                            }
                        } catch (Throwable ee) {
                            // If there's error in the heartbeat, collect the log and print out.
                            // Release the log string buffer.
                            if (!isShutdown()) {
                                clientStatsInfo.bookHB2BrokerException();
                                samplePrintCtrl.printExceptionCaught(ee);
                                if (!partStrSet.isEmpty()) {
                                    strBuffer.delete(0, strBuffer.length());
                                    for (String partitionStr : partStrSet) {
                                        Partition tmpPartition = new Partition(partitionStr);
                                        removePartition(tmpPartition);
                                        logger.warn(strBuffer
                                                .append("[heart2broker Throwable] release partition:")
                                                .append(partitionStr).toString());
                                        strBuffer.delete(0, strBuffer.length());
                                    }
                                }
                            }
                        }
                    }
                    // Wait for next heartbeat
                    lastHeartbeatTime2Broker = System.currentTimeMillis();
                    Thread.sleep(consumerConfig.getHeartbeatPeriodMs());
                } catch (Throwable e) {
                    clientStatsInfo.bookHB2BrokerException();
                    lastHeartbeatTime2Broker = System.currentTimeMillis();
                    if (!isShutdown()) {
                        logger.error("heartbeat thread error 3 : ", e);
                    }
                }
            }
        }