in aliyun/log/consumer/worker.py [0:0]
def _need_stop(self):
"""
check if need to stop:
1. end_cursor has been hit and there's no more shard assinged (wait for heatbeat_interval * 3)
:return:
"""
if not self.option.cursor_end_time:
return False
all_finish = True
for shard, consumer in self.shard_consumers.items():
if consumer.is_shutdown():
continue
# has not yet do any successful fetch yet or get some data
if consumer.last_success_fetch_time == 0 or consumer.last_fetch_count > 0:
return False
# init self.last_owned_consumer_finish_time if it's None
if all_finish and self.last_owned_consumer_finish_time == 0:
self.last_owned_consumer_finish_time = time.time()
if abs(time.time() - self.last_owned_consumer_finish_time) >= \
self.option.consumer_group_time_out + self.option.heartbeat_interval:
return True
return False