in inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java [1581:1705]
public void run() {
StringBuilder strBuffer = new StringBuilder(256);
while (!isShutdown()) {
try {
// First check the last heartbeat interval. If it's larger than two periods,
// there may be some system hang up(e.g. long time gc, CPU is too busy).
// Print the warning message.
long currentTime = System.currentTimeMillis();
if ((currentTime - lastHeartbeatTime2Broker) > (consumerConfig.getHeartbeatPeriodMs() * 2)) {
logger.warn(strBuffer.append(consumerId)
.append(" heartbeat interval to broker is too long,please check! Total time : ")
.append(currentTime - lastHeartbeatTime2Broker).toString());
strBuffer.delete(0, strBuffer.length());
}
// Send heartbeat request to the broker connect by the client
for (BrokerInfo brokerInfo : rmtDataCache.getAllRegisterBrokers()) {
List<String> partStrSet = new ArrayList<String>();
try {
// Handle the heartbeat response for partitions belong to the same broker.
List<Partition> partitions =
rmtDataCache.getBrokerPartitionList(brokerInfo);
if ((partitions != null) && (!partitions.isEmpty())) {
for (Partition partition : partitions) {
partStrSet.add(partition.toString());
}
ClientBroker.HeartBeatResponseB2C heartBeatResponseV2 =
getBrokerService(brokerInfo).consumerHeartbeatC2B(
createBrokerHeartBeatRequest(brokerInfo.getBrokerId(), partStrSet),
AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
// When response is success
if (heartBeatResponseV2 == null) {
clientStatsInfo.bookHB2BrokerTimeout();
continue;
}
if (heartBeatResponseV2.getSuccess()) {
// If the peer require authentication, set a flag.
// The following request will attach the auth information.
rmtDataCache.bookBrokerRequireAuthInfo(
brokerInfo.getBrokerId(), heartBeatResponseV2);
// If the heartbeat response report failed partitions, release the
// corresponding local partition and log the operation
if (heartBeatResponseV2.getHasPartFailure()) {
try {
List<String> strFailInfoList =
heartBeatResponseV2.getFailureInfoList();
for (String strFailInfo : strFailInfoList) {
final int index =
strFailInfo.indexOf(TokenConstants.ATTR_SEP);
if (index < 0) {
logger.error(strBuffer
.append("Parse Heartbeat response error : ")
.append("invalid response, ")
.append(strFailInfo).toString());
strBuffer.delete(0, strBuffer.length());
continue;
}
int errorCode =
Integer.parseInt(strFailInfo.substring(0, index));
Partition failPartition =
new Partition(strFailInfo.substring(index + 1));
removePartition(failPartition);
logger.warn(strBuffer
.append("[heart2broker error] partition:")
.append(failPartition.toString())
.append(", errorCode=")
.append(errorCode).toString());
strBuffer.delete(0, strBuffer.length());
}
} catch (Throwable ee) {
if (!isShutdown()) {
strBuffer.delete(0, strBuffer.length());
logger.error(strBuffer
.append("Parse Heartbeat response error :")
.append(ee.getMessage()).toString());
strBuffer.delete(0, strBuffer.length());
}
}
}
} else {
clientStatsInfo.bookHB2BrokerException();
if (heartBeatResponseV2.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
for (Partition partition : partitions) {
removePartition(partition);
}
logger.warn(strBuffer
.append("[heart2broker error] certificate failure, ")
.append(brokerInfo.getBrokerStrInfo())
.append("'s partitions area released, ")
.append(heartBeatResponseV2.getErrMsg()).toString());
strBuffer.delete(0, strBuffer.length());
}
}
}
} catch (Throwable ee) {
// If there's error in the heartbeat, collect the log and print out.
// Release the log string buffer.
if (!isShutdown()) {
clientStatsInfo.bookHB2BrokerException();
samplePrintCtrl.printExceptionCaught(ee);
if (!partStrSet.isEmpty()) {
strBuffer.delete(0, strBuffer.length());
for (String partitionStr : partStrSet) {
Partition tmpPartition = new Partition(partitionStr);
removePartition(tmpPartition);
logger.warn(strBuffer
.append("[heart2broker Throwable] release partition:")
.append(partitionStr).toString());
strBuffer.delete(0, strBuffer.length());
}
}
}
}
}
// Wait for next heartbeat
lastHeartbeatTime2Broker = System.currentTimeMillis();
Thread.sleep(consumerConfig.getHeartbeatPeriodMs());
} catch (Throwable e) {
clientStatsInfo.bookHB2BrokerException();
lastHeartbeatTime2Broker = System.currentTimeMillis();
if (!isShutdown()) {
logger.error("heartbeat thread error 3 : ", e);
}
}
}
}