def fetch_data()

in aliyun/log/consumer/shard_worker.py [0:0]


    def fetch_data(self):
        # no task or it's done
        if self.fetch_data_future is None or self.fetch_data_future.done():
            task_result = self.get_task_result(self.fetch_data_future)

            # task is done, output results and get next_cursor
            if task_result is not None and task_result.get_exception() is None:
                assert isinstance(task_result, FetchTaskResult), \
                    ClientWorkerException("fetch result type is not as expected")

                self.last_success_fetch_time = time.time()

                self.last_fetch_log_group = FetchedLogGroup(self.shard_id, task_result.get_fetched_log_group_list(),
                                                            task_result.get_cursor())
                self.next_fetch_cursor = task_result.get_cursor()
                self.last_fetch_count = self.last_fetch_log_group.log_group_size
                self.last_fetch_size = task_result.get_raw_size()
                self.rawLogGroupCountBeforeQuery = task_result.get_raw_log_group_count_before_query()
                self.rawSizeBeforeQuery = task_result.get_raw_size_before_query()
                if self.last_fetch_count > 0:
                    self.last_success_fetch_time_with_data = time.time()
                    self.save_last_checkpoint = False
                else:
                    if self.last_success_fetch_time_with_data != 0 and time.time() - self.last_success_fetch_time_with_data > 30 \
                            and not self.save_last_checkpoint:
                        self.checkpoint_tracker.flush_check_point()
                        self.save_last_checkpoint = True


            self._sample_log_error(task_result)

            # no task or task is done, create new task
            if task_result is None or task_result.get_exception() is None:
                # flag to indicate if it's done
                is_generate_fetch_task = True
                fetch_size = self.last_fetch_size
                fetch_count = self.last_fetch_count
                if self.query:
                    fetch_size = self.rawSizeBeforeQuery
                    fetch_count = self.rawLogGroupCountBeforeQuery

                # throttling control, similar as Java's SDK
                if fetch_size < 1024 * 1024 and fetch_count < 100 and fetch_count < self.max_fetch_log_group_size:
                    is_generate_fetch_task = (time.time() - self.last_fetch_time) > 0.5
                elif fetch_size < 2 * 1024 * 1024 and fetch_count < 500 and fetch_count < self.max_fetch_log_group_size:
                    is_generate_fetch_task = (time.time() - self.last_fetch_time) > 0.2
                elif fetch_size < 4 * 1024 * 1024 and fetch_count < 1000 and fetch_count < self.max_fetch_log_group_size:
                    is_generate_fetch_task = (time.time() - self.last_fetch_time) > 0.05
                if is_generate_fetch_task:
                    self.last_fetch_time = time.time()
                    self.fetch_data_future = \
                        self.executor.submit(consumer_fetch_task,
                                             self.log_client, self.shard_id, self.next_fetch_cursor,
                                             max_fetch_log_group_size=self.max_fetch_log_group_size,
                                             end_cursor=self.fetch_end_cursor, query=self.query)
                else:
                    self.fetch_data_future = None
            else:
                self.fetch_data_future = None