public void run()

in inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java [1430:1561]


        public void run() {
            StringBuilder strBuffer = new StringBuilder(256);
            try {
                rmtDataCache.resumeTimeoutConsumePartitions(isPullConsume,
                        consumerConfig.getPullProtectConfirmTimeoutMs());
                // print metric information
                clientStatsInfo.selfPrintStatsInfo(false, true, strBuffer);
                // Fetch the rebalance result, construct message adn return it.
                ConsumerEvent event = rebalanceResults.poll();
                List<SubscribeInfo> subInfoList = null;
                boolean reportSubscribeInfo = false;
                if ((event != null)
                        || (++reportIntervalTimes >= consumerConfig.getMaxSubInfoReportIntvlTimes())) {
                    subInfoList =
                            rmtDataCache.getSubscribeInfoList(consumerId,
                                    consumerConfig.getConsumerGroup());
                    reportSubscribeInfo = true;
                    reportIntervalTimes = 0;
                }
                // Send heartbeat request to master
                ClientMaster.HeartResponseM2C response =
                        masterService.consumerHeartbeatC2M(
                                createMasterHeartbeatRequest(event, subInfoList, reportSubscribeInfo),
                                AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
                // Process unsuccessful response
                if (response == null) {
                    clientStatsInfo.bookHB2MasterTimeout();
                    logger.error(strBuffer.append("[Heartbeat Failed] ")
                            .append("return result is null!").toString());
                    heartbeatRetryTimes++;
                    return;
                }
                if (!response.getSuccess()) {
                    // If master replies that cannot find current consumer node, re-register
                    if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) {
                        clientStatsInfo.bookHB2MasterTimeout();
                        try {
                            ClientMaster.RegisterResponseM2C regResponse =
                                    masterService.consumerRegisterC2M(createMasterRegisterRequest(),
                                            AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
                            // Print the log when registration fails
                            if (regResponse == null || !regResponse.getSuccess()) {
                                if (regResponse == null) {
                                    logger.error(strBuffer.append("[Re-Register Failed] ")
                                            .append(consumerId)
                                            .append(" register to master return null!").toString());
                                } else {
                                    // If the consumer group is forbidden, output the log
                                    if (response.getErrCode() == TErrCodeConstants.CONSUME_GROUP_FORBIDDEN) {
                                        logger.error(strBuffer.append("[Re-Register Failed] ")
                                                .append(consumerId).append(" ConsumeGroup forbidden, ")
                                                .append(response.getErrMsg()).toString());
                                    } else {
                                        logger.error(strBuffer.append("[Re-Register Failed] ")
                                                .append(consumerId).append(" ")
                                                .append(response.getErrMsg()).toString());
                                    }
                                }
                                strBuffer.delete(0, strBuffer.length());
                            } else {
                                // Process the successful response. Record the response information,
                                // including control rules and latest auth token.
                                processRegisterAllocAndRspFlowRules(regResponse, strBuffer);
                                processRegAuthorizedToken(regResponse);
                                logger.info(strBuffer.append("[Re-register] ")
                                        .append(consumerId).toString());
                                strBuffer.delete(0, strBuffer.length());
                            }
                        } catch (Throwable e) {
                            strBuffer.delete(0, strBuffer.length());
                            logger.error(strBuffer.append("Register to master failed.")
                                    .append(e.getCause()).toString());
                            ThreadUtils.sleep(1000);
                        }
                        return;
                    }
                    clientStatsInfo.bookHB2MasterException();
                    logger.error(strBuffer.append("[Heartbeat Failed] ")
                            .append(response.getErrMsg()).toString());
                    if (response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
                        adjustHeartBeatPeriod("certificate failure", strBuffer);
                    } else {
                        heartbeatRetryTimes++;
                    }
                    return;
                }
                // Process the heartbeat success response
                heartbeatRetryTimes = 0;
                // Get the authorization rules and update the local rules
                procHeartBeatRspAllocAndFlowRules(response, strBuffer);
                // Get the latest authorized token
                processHeartBeatAuthorizedToken(response);
                // Check if master requires to check authorization next time. If so, set the flag
                // and exchange the authorize information next time.
                if (response.hasRequireAuth()) {
                    rmtDataCache.storeMasterAuthRequire(response.getRequireAuth());
                }
                // Get the latest rebalance task
                ClientMaster.EventProto eventProto = response.getEvent();
                if ((eventProto != null) && (eventProto.getRebalanceId() > 0)) {
                    ConsumerEvent newEvent =
                            new ConsumerEvent(eventProto.getRebalanceId(),
                                    EventType.valueOf(eventProto.getOpType()),
                                    DataConverterUtil.convertSubInfo(eventProto.getSubscribeInfoList()),
                                    EventStatus.TODO);
                    rebalanceEvents.put(newEvent);
                    if (logger.isDebugEnabled()) {
                        strBuffer.append("[Receive Consumer Event]");
                        logger.debug(newEvent.toStrBuilder(consumerId, strBuffer).toString());
                        strBuffer.delete(0, strBuffer.length());
                    }
                }
                // Warning if heartbeat interval is too long
                long currentTime = System.currentTimeMillis();
                if ((currentTime - lastHeartbeatTime2Master) > consumerConfig.getHeartbeatPeriodMs() * 2) {
                    logger.warn(strBuffer.append(consumerId)
                            .append(" heartbeat interval to master is too long,please check! Total time : ")
                            .append(currentTime - lastHeartbeatTime2Master).toString());
                    strBuffer.delete(0, strBuffer.length());
                }
                lastHeartbeatTime2Master = currentTime;
            } catch (InterruptedException ee) {
                logger.info("To Master Heartbeat thread is interrupted,existed!");
            } catch (Throwable e) {
                // Print the log when meeting heartbeat errors.
                // Reduce the heartbeat request frequency when failure count exceed the threshold
                if (!isShutdown()) {
                    logger.error("Heartbeat failed,retry later.", e);
                }
                adjustHeartBeatPeriod("heartbeat exception", strBuffer);
            }
        }