in aliyun/log/consumer/tasks.py [0:0]
def consumer_fetch_task(loghub_client_adapter, shard_id, cursor, max_fetch_log_group_size=1000, end_cursor=None, query=None):
exception = None
for retry_times in range(3):
try:
response = loghub_client_adapter.pull_logs(shard_id, cursor, count=max_fetch_log_group_size, end_cursor=end_cursor, query=query)
fetch_log_group_list = response.get_loggroup_list()
next_cursor = response.get_next_cursor()
raw_size = response.get_raw_size()
raw_size_before_query = 0
raw_log_group_count_before_query = 0
if query:
raw_size_before_query = max(response.get_raw_size_before_query(), 0)
raw_log_group_count_before_query = max(response.get_raw_log_group_count_before_query(), 0)
logger.debug("shard id = %s cursor = %s next cursor = %s size: %s",
shard_id, cursor, next_cursor,
response.get_log_count())
if not next_cursor:
return FetchTaskResult(fetch_log_group_list, cursor, raw_size, raw_size_before_query, raw_log_group_count_before_query)
else:
return FetchTaskResult(fetch_log_group_list, next_cursor, raw_size, raw_size_before_query, raw_log_group_count_before_query)
except LogException as e:
exception = e
if exception.get_resp_status() == 403:
time.sleep(5)
except Exception as e1:
logger.error(e1, exc_info=True)
raise Exception(e1)
# only retry if the first request get "SLSInvalidCursor" exception
if retry_times == 0 and isinstance(exception, LogException) \
and 'invalidcursor' in exception.get_error_code().lower():
try:
cursor = loghub_client_adapter.get_end_cursor(shard_id)
except Exception:
return TaskResult(exception)
else:
break
return TaskResult(exception)