def extract_pagination_info()

in connectors/sources/graphql.py [0:0]


    def extract_pagination_info(self, data):
        pagination_key_path = self.pagination_key.split(".")
        for key in pagination_key_path:
            if isinstance(data, dict):
                if key not in data.keys():
                    msg = f"Pagination key: {key} not found in the response. Please check the query."
                    raise ConfigurableFieldValueError(msg)
                data = data.get(key)
            else:
                msg = "Found list while processing Pagination key. Please check the query and provide path upto dictionary."
                raise ConfigurableFieldValueError(msg)

        if (
            isinstance(data, dict)
            and data.get("pageInfo")
            and {"hasNextPage", "endCursor"}.issubset(set(data.get("pageInfo", {})))
        ):
            page_info = data.get("pageInfo")
            has_next_page = page_info.get("hasNextPage")
            end_cursor = page_info.get("endCursor")

            return has_next_page, end_cursor, pagination_key_path[-1]
        else:
            msg = "Pagination is enabled but the query is missing 'pageInfo'. Please include 'pageInfo { hasNextPage endCursor }' in the query to support pagination."
            raise ConfigurableFieldValueError(msg)