in connectors/sources/graphql.py [0:0]
def extract_pagination_info(self, data):
pagination_key_path = self.pagination_key.split(".")
for key in pagination_key_path:
if isinstance(data, dict):
if key not in data.keys():
msg = f"Pagination key: {key} not found in the response. Please check the query."
raise ConfigurableFieldValueError(msg)
data = data.get(key)
else:
msg = "Found list while processing Pagination key. Please check the query and provide path upto dictionary."
raise ConfigurableFieldValueError(msg)
if (
isinstance(data, dict)
and data.get("pageInfo")
and {"hasNextPage", "endCursor"}.issubset(set(data.get("pageInfo", {})))
):
page_info = data.get("pageInfo")
has_next_page = page_info.get("hasNextPage")
end_cursor = page_info.get("endCursor")
return has_next_page, end_cursor, pagination_key_path[-1]
else:
msg = "Pagination is enabled but the query is missing 'pageInfo'. Please include 'pageInfo { hasNextPage endCursor }' in the query to support pagination."
raise ConfigurableFieldValueError(msg)