in aliyun/log/consumer/shard_worker.py [0:0]
def fetch_data(self):
# no task or it's done
if self.fetch_data_future is None or self.fetch_data_future.done():
task_result = self.get_task_result(self.fetch_data_future)
# task is done, output results and get next_cursor
if task_result is not None and task_result.get_exception() is None:
assert isinstance(task_result, FetchTaskResult), \
ClientWorkerException("fetch result type is not as expected")
self.last_success_fetch_time = time.time()
self.last_fetch_log_group = FetchedLogGroup(self.shard_id, task_result.get_fetched_log_group_list(),
task_result.get_cursor())
self.next_fetch_cursor = task_result.get_cursor()
self.last_fetch_count = self.last_fetch_log_group.log_group_size
self.last_fetch_size = task_result.get_raw_size()
self.rawLogGroupCountBeforeQuery = task_result.get_raw_log_group_count_before_query()
self.rawSizeBeforeQuery = task_result.get_raw_size_before_query()
if self.last_fetch_count > 0:
self.last_success_fetch_time_with_data = time.time()
self.save_last_checkpoint = False
else:
if self.last_success_fetch_time_with_data != 0 and time.time() - self.last_success_fetch_time_with_data > 30 \
and not self.save_last_checkpoint:
self.checkpoint_tracker.flush_check_point()
self.save_last_checkpoint = True
self._sample_log_error(task_result)
# no task or task is done, create new task
if task_result is None or task_result.get_exception() is None:
# flag to indicate if it's done
is_generate_fetch_task = True
fetch_size = self.last_fetch_size
fetch_count = self.last_fetch_count
if self.query:
fetch_size = self.rawSizeBeforeQuery
fetch_count = self.rawLogGroupCountBeforeQuery
# throttling control, similar as Java's SDK
if fetch_size < 1024 * 1024 and fetch_count < 100 and fetch_count < self.max_fetch_log_group_size:
is_generate_fetch_task = (time.time() - self.last_fetch_time) > 0.5
elif fetch_size < 2 * 1024 * 1024 and fetch_count < 500 and fetch_count < self.max_fetch_log_group_size:
is_generate_fetch_task = (time.time() - self.last_fetch_time) > 0.2
elif fetch_size < 4 * 1024 * 1024 and fetch_count < 1000 and fetch_count < self.max_fetch_log_group_size:
is_generate_fetch_task = (time.time() - self.last_fetch_time) > 0.05
if is_generate_fetch_task:
self.last_fetch_time = time.time()
self.fetch_data_future = \
self.executor.submit(consumer_fetch_task,
self.log_client, self.shard_id, self.next_fetch_cursor,
max_fetch_log_group_size=self.max_fetch_log_group_size,
end_cursor=self.fetch_end_cursor, query=self.query)
else:
self.fetch_data_future = None
else:
self.fetch_data_future = None