in inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java [1430:1561]
public void run() {
StringBuilder strBuffer = new StringBuilder(256);
try {
rmtDataCache.resumeTimeoutConsumePartitions(isPullConsume,
consumerConfig.getPullProtectConfirmTimeoutMs());
// print metric information
clientStatsInfo.selfPrintStatsInfo(false, true, strBuffer);
// Fetch the rebalance result, construct message adn return it.
ConsumerEvent event = rebalanceResults.poll();
List<SubscribeInfo> subInfoList = null;
boolean reportSubscribeInfo = false;
if ((event != null)
|| (++reportIntervalTimes >= consumerConfig.getMaxSubInfoReportIntvlTimes())) {
subInfoList =
rmtDataCache.getSubscribeInfoList(consumerId,
consumerConfig.getConsumerGroup());
reportSubscribeInfo = true;
reportIntervalTimes = 0;
}
// Send heartbeat request to master
ClientMaster.HeartResponseM2C response =
masterService.consumerHeartbeatC2M(
createMasterHeartbeatRequest(event, subInfoList, reportSubscribeInfo),
AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
// Process unsuccessful response
if (response == null) {
clientStatsInfo.bookHB2MasterTimeout();
logger.error(strBuffer.append("[Heartbeat Failed] ")
.append("return result is null!").toString());
heartbeatRetryTimes++;
return;
}
if (!response.getSuccess()) {
// If master replies that cannot find current consumer node, re-register
if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) {
clientStatsInfo.bookHB2MasterTimeout();
try {
ClientMaster.RegisterResponseM2C regResponse =
masterService.consumerRegisterC2M(createMasterRegisterRequest(),
AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
// Print the log when registration fails
if (regResponse == null || !regResponse.getSuccess()) {
if (regResponse == null) {
logger.error(strBuffer.append("[Re-Register Failed] ")
.append(consumerId)
.append(" register to master return null!").toString());
} else {
// If the consumer group is forbidden, output the log
if (response.getErrCode() == TErrCodeConstants.CONSUME_GROUP_FORBIDDEN) {
logger.error(strBuffer.append("[Re-Register Failed] ")
.append(consumerId).append(" ConsumeGroup forbidden, ")
.append(response.getErrMsg()).toString());
} else {
logger.error(strBuffer.append("[Re-Register Failed] ")
.append(consumerId).append(" ")
.append(response.getErrMsg()).toString());
}
}
strBuffer.delete(0, strBuffer.length());
} else {
// Process the successful response. Record the response information,
// including control rules and latest auth token.
processRegisterAllocAndRspFlowRules(regResponse, strBuffer);
processRegAuthorizedToken(regResponse);
logger.info(strBuffer.append("[Re-register] ")
.append(consumerId).toString());
strBuffer.delete(0, strBuffer.length());
}
} catch (Throwable e) {
strBuffer.delete(0, strBuffer.length());
logger.error(strBuffer.append("Register to master failed.")
.append(e.getCause()).toString());
ThreadUtils.sleep(1000);
}
return;
}
clientStatsInfo.bookHB2MasterException();
logger.error(strBuffer.append("[Heartbeat Failed] ")
.append(response.getErrMsg()).toString());
if (response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
adjustHeartBeatPeriod("certificate failure", strBuffer);
} else {
heartbeatRetryTimes++;
}
return;
}
// Process the heartbeat success response
heartbeatRetryTimes = 0;
// Get the authorization rules and update the local rules
procHeartBeatRspAllocAndFlowRules(response, strBuffer);
// Get the latest authorized token
processHeartBeatAuthorizedToken(response);
// Check if master requires to check authorization next time. If so, set the flag
// and exchange the authorize information next time.
if (response.hasRequireAuth()) {
rmtDataCache.storeMasterAuthRequire(response.getRequireAuth());
}
// Get the latest rebalance task
ClientMaster.EventProto eventProto = response.getEvent();
if ((eventProto != null) && (eventProto.getRebalanceId() > 0)) {
ConsumerEvent newEvent =
new ConsumerEvent(eventProto.getRebalanceId(),
EventType.valueOf(eventProto.getOpType()),
DataConverterUtil.convertSubInfo(eventProto.getSubscribeInfoList()),
EventStatus.TODO);
rebalanceEvents.put(newEvent);
if (logger.isDebugEnabled()) {
strBuffer.append("[Receive Consumer Event]");
logger.debug(newEvent.toStrBuilder(consumerId, strBuffer).toString());
strBuffer.delete(0, strBuffer.length());
}
}
// Warning if heartbeat interval is too long
long currentTime = System.currentTimeMillis();
if ((currentTime - lastHeartbeatTime2Master) > consumerConfig.getHeartbeatPeriodMs() * 2) {
logger.warn(strBuffer.append(consumerId)
.append(" heartbeat interval to master is too long,please check! Total time : ")
.append(currentTime - lastHeartbeatTime2Master).toString());
strBuffer.delete(0, strBuffer.length());
}
lastHeartbeatTime2Master = currentTime;
} catch (InterruptedException ee) {
logger.info("To Master Heartbeat thread is interrupted,existed!");
} catch (Throwable e) {
// Print the log when meeting heartbeat errors.
// Reduce the heartbeat request frequency when failure count exceed the threshold
if (!isShutdown()) {
logger.error("Heartbeat failed,retry later.", e);
}
adjustHeartBeatPeriod("heartbeat exception", strBuffer);
}
}