def run()

in aliyun/log/consumer/worker.py [0:0]


    def run(self):
        self.logger.info('consumer worker "{0}" start '.format(self.option.consumer_name))
        self.heart_beat.start()

        while not self.shut_down_flag:
            held_shards = self.heart_beat.get_held_shards()

            last_fetch_time = time.time()
            for shard in held_shards:
                if self.shut_down_flag:
                    break

                shard_consumer = self._get_shard_consumer(shard)
                if shard_consumer is None:  # error when init consumer. shutdown directly
                    self.shutdown()
                    break

                shard_consumer.consume()

            self.clean_shard_consumer(held_shards)

            if self._need_stop():
                self.logger.info("all owned shards complete the tasks, owned shards: {0}".format(self.shard_consumers))
                self.shutdown()

            time_to_sleep = self.option.data_fetch_interval - (time.time() - last_fetch_time)
            while time_to_sleep > 0 and not self.shut_down_flag:
                time.sleep(min(time_to_sleep, 1))
                time_to_sleep = self.option.data_fetch_interval - (time.time() - last_fetch_time)

        # # stopping worker, need to cleanup all existing shard consumer
        self.logger.info('consumer worker "{0}" try to cleanup consumers'.format(self.option.consumer_name))
        self.shutdown_and_wait()

        if self.own_executor:
            self.logger.info('consumer worker "{0}" try to shutdown executors'.format(self.option.consumer_name))
            self._executor.shutdown()
            self.logger.info('consumer worker "{0}" stopped'.format(self.option.consumer_name))
        else:
            self.logger.info('executor is shared, consumer worker "{0}" stopped'.format(self.option.consumer_name))