in aliyun/log/consumer/worker.py [0:0]
def run(self):
self.logger.info('consumer worker "{0}" start '.format(self.option.consumer_name))
self.heart_beat.start()
while not self.shut_down_flag:
held_shards = self.heart_beat.get_held_shards()
last_fetch_time = time.time()
for shard in held_shards:
if self.shut_down_flag:
break
shard_consumer = self._get_shard_consumer(shard)
if shard_consumer is None: # error when init consumer. shutdown directly
self.shutdown()
break
shard_consumer.consume()
self.clean_shard_consumer(held_shards)
if self._need_stop():
self.logger.info("all owned shards complete the tasks, owned shards: {0}".format(self.shard_consumers))
self.shutdown()
time_to_sleep = self.option.data_fetch_interval - (time.time() - last_fetch_time)
while time_to_sleep > 0 and not self.shut_down_flag:
time.sleep(min(time_to_sleep, 1))
time_to_sleep = self.option.data_fetch_interval - (time.time() - last_fetch_time)
# # stopping worker, need to cleanup all existing shard consumer
self.logger.info('consumer worker "{0}" try to cleanup consumers'.format(self.option.consumer_name))
self.shutdown_and_wait()
if self.own_executor:
self.logger.info('consumer worker "{0}" try to shutdown executors'.format(self.option.consumer_name))
self._executor.shutdown()
self.logger.info('consumer worker "{0}" stopped'.format(self.option.consumer_name))
else:
self.logger.info('executor is shared, consumer worker "{0}" stopped'.format(self.option.consumer_name))