in aliyun/log/consumer/heart_beat.py [0:0]
def run(self):
self.logger.info('heart beat start')
while not self.shut_down_flag:
try:
response_shards = []
last_heatbeat_time = time.time()
if self.log_client.heartbeat(self.mheart_shards, response_shards):
self.last_hearbeat_successed_unixtime = time.time()
self.logger.debug('heart beat result: %s get: %s',
self.mheart_shards, response_shards)
if self.mheart_shards != response_shards:
current_set, response_set = set(
self.mheart_shards), set(response_shards)
add_set = response_set - current_set
remove_set = current_set - response_set
if any([add_set, remove_set]):
self.logger.info(
"shard reorganize, adding: %s, removing: %s",
add_set, remove_set)
else:
if time.time() - self.last_hearbeat_successed_unixtime > \
(self.consumer_group_time_out + self.heartbeat_interval):
response_shards = []
self.logger.info(
"Heart beat timeout, automatic reset consumer held shards")
else:
with self.lock:
response_shards = self.mheld_shards
self.logger.info(
"Heart beat failed, Keep the held shards unchanged")
with self.lock:
self.mheart_shards = list(set(self.mheart_shards + response_shards))
self.mheld_shards = response_shards
# default sleep for 2s from "LogHubConfig"
time_to_sleep = self.heartbeat_interval - (time.time() - last_heatbeat_time)
while time_to_sleep > 0 and not self.shut_down_flag:
time.sleep(min(time_to_sleep, 1))
time_to_sleep = self.heartbeat_interval - (time.time() - last_heatbeat_time)
except Exception as e:
self.logger.warning("fail to heat beat", e)
self.logger.info('heart beat exit')