def consumer_fetch_task()

in aliyun/log/consumer/tasks.py [0:0]


def consumer_fetch_task(loghub_client_adapter, shard_id, cursor, max_fetch_log_group_size=1000, end_cursor=None, query=None):
    exception = None

    for retry_times in range(3):
        try:
            response = loghub_client_adapter.pull_logs(shard_id, cursor, count=max_fetch_log_group_size, end_cursor=end_cursor, query=query)
            fetch_log_group_list = response.get_loggroup_list()
            next_cursor = response.get_next_cursor()
            raw_size = response.get_raw_size()
            raw_size_before_query = 0
            raw_log_group_count_before_query = 0
            if query:
                raw_size_before_query = max(response.get_raw_size_before_query(), 0)
                raw_log_group_count_before_query = max(response.get_raw_log_group_count_before_query(), 0)
            logger.debug("shard id = %s cursor = %s next cursor = %s size: %s",
                         shard_id, cursor, next_cursor,
                         response.get_log_count())
            if not next_cursor:
                return FetchTaskResult(fetch_log_group_list, cursor, raw_size, raw_size_before_query, raw_log_group_count_before_query)
            else:
                return FetchTaskResult(fetch_log_group_list, next_cursor, raw_size, raw_size_before_query, raw_log_group_count_before_query)
        except LogException as e:
            exception = e
            if exception.get_resp_status() == 403:
                time.sleep(5)
        except Exception as e1:
            logger.error(e1, exc_info=True)
            raise Exception(e1)

        # only retry if the first request get "SLSInvalidCursor" exception
        if retry_times == 0 and isinstance(exception, LogException) \
                and 'invalidcursor' in exception.get_error_code().lower():
            try:
                cursor = loghub_client_adapter.get_end_cursor(shard_id)
            except Exception:
                return TaskResult(exception)
        else:
            break

    return TaskResult(exception)