azure-kusto-data/azure/kusto/data/aio/streaming_response.py (189 lines of code) (raw):

from typing import Any, Tuple, Dict, Iterator import aiohttp import ijson from ijson import IncompleteJSONError from azure.kusto.data._models import WellKnownDataSet from azure.kusto.data.exceptions import KustoTokenParsingError, KustoUnsupportedApiError, KustoApiError, KustoMultiApiError from azure.kusto.data.streaming_response import JsonTokenType, FrameType, JsonToken class JsonTokenReader: def __init__(self, stream: aiohttp.StreamReader): self.json_iter = ijson.parse_async(stream, use_float=True) def __aiter__(self) -> "JsonTokenReader": return self def __anext__(self) -> JsonToken: return self.read_next_token_or_throw() async def read_next_token_or_throw(self) -> JsonToken: try: next_item = await self.json_iter.__anext__() except IncompleteJSONError: next_item = None if next_item is None: raise KustoTokenParsingError("Unexpected end of stream") (token_path, token_type, token_value) = next_item return JsonToken(token_path, JsonTokenType[token_type.upper()], token_value) async def read_token_of_type(self, *token_types: JsonTokenType) -> JsonToken: token = await self.read_next_token_or_throw() if token.token_type not in token_types: raise KustoTokenParsingError(f"Expected one the following types: '{','.join(t.name for t in token_types)}' , got type {token.token_type}") return token async def read_start_object(self) -> JsonToken: return await self.read_token_of_type(JsonTokenType.START_MAP) async def read_start_array(self) -> JsonToken: return await self.read_token_of_type(JsonTokenType.START_ARRAY) async def read_string(self) -> str: return (await self.read_token_of_type(JsonTokenType.STRING)).token_value async def read_boolean(self) -> bool: return (await self.read_token_of_type(JsonTokenType.BOOLEAN)).token_value async def read_number(self) -> float: return (await self.read_token_of_type(JsonTokenType.NUMBER)).token_value async def skip_children(self, prev_token: JsonToken): if prev_token.token_type == JsonTokenType.MAP_KEY: prev_token = await self.read_next_token_or_throw() if prev_token.token_type in JsonTokenType.start_tokens(): async for potential_end_token in self: if potential_end_token.token_path == prev_token.token_path and potential_end_token.token_type in JsonTokenType.end_tokens(): break async def skip_until_property_name(self, name: str) -> JsonToken: while True: token = await self.read_token_of_type(JsonTokenType.MAP_KEY) if token.token_value == name: return token await self.skip_children(token) async def skip_until_any_property_name(self, *names: str) -> JsonToken: while True: token = await self.read_token_of_type(JsonTokenType.MAP_KEY) if token.token_value in names: return token await self.skip_children(token) async def skip_until_property_name_or_end_object(self, *names: str) -> JsonToken: async for token in self: if token.token_type == JsonTokenType.END_MAP: return token if token.token_type == JsonTokenType.MAP_KEY: if token.token_value in names: return token await self.skip_children(token) continue raise Exception(f"Unexpected token {token}") async def skip_until_token_with_paths(self, *tokens: (JsonTokenType, str)) -> JsonToken: async for token in self: if any((token.token_type == t_type and token.token_path == t_path) for (t_type, t_path) in tokens): return token await self.skip_children(token) class StreamingDataSetEnumerator: def __init__(self, reader: JsonTokenReader): self.reader = reader self.done = False self.started = False self.started_primary_results = False self.finished_primary_results = False def __aiter__(self) -> "StreamingDataSetEnumerator": return self async def __anext__(self) -> Dict[str, Any]: if self.done: raise StopIteration() if not self.started: await self.reader.read_start_array() self.started = True token = await self.reader.skip_until_token_with_paths((JsonTokenType.START_MAP, "item"), (JsonTokenType.END_ARRAY, "")) if token == JsonTokenType.END_ARRAY: self.done = True raise StopIteration() frame_type = await self.read_frame_type() parsed_frame = await self.parse_frame(frame_type) is_primary_result = parsed_frame["FrameType"] == FrameType.DataTable and parsed_frame["TableKind"] == WellKnownDataSet.PrimaryResult.value if is_primary_result: self.started_primary_results = True elif self.started_primary_results: self.finished_primary_results = True return parsed_frame async def parse_frame(self, frame_type: FrameType) -> Dict[str, Any]: if frame_type == FrameType.DataSetHeader: frame = await self.extract_props(frame_type, ("IsProgressive", JsonTokenType.BOOLEAN), ("Version", JsonTokenType.STRING)) if frame["IsProgressive"]: raise KustoUnsupportedApiError.progressive_api_unsupported() return frame if frame_type in [FrameType.TableHeader, FrameType.TableFragment, FrameType.TableCompletion, FrameType.TableProgress]: raise KustoUnsupportedApiError.progressive_api_unsupported() if frame_type == FrameType.DataTable: props = await self.extract_props( frame_type, ("TableId", JsonTokenType.NUMBER), ("TableKind", JsonTokenType.STRING), ("TableName", JsonTokenType.STRING), ("Columns", JsonTokenType.START_ARRAY), ) await self.reader.skip_until_property_name("Rows") props["Rows"] = self.row_iterator() if props["TableKind"] != WellKnownDataSet.PrimaryResult.value: props["Rows"] = [r async for r in props["Rows"]] return props if frame_type == FrameType.DataSetCompletion: res = await self.extract_props(frame_type, ("HasErrors", JsonTokenType.BOOLEAN), ("Cancelled", JsonTokenType.BOOLEAN)) token = await self.reader.skip_until_property_name_or_end_object("OneApiErrors") if token.token_type != JsonTokenType.END_MAP: res["OneApiErrors"] = self.parse_array(skip_start=False) return res async def row_iterator(self) -> Iterator[list]: await self.reader.read_token_of_type(JsonTokenType.START_ARRAY) while True: token = await self.reader.read_token_of_type(JsonTokenType.START_ARRAY, JsonTokenType.END_ARRAY, JsonTokenType.START_MAP) if token.token_type == JsonTokenType.START_MAP: raise KustoMultiApiError([await self.parse_object(skip_start=True)]) if token.token_type == JsonTokenType.END_ARRAY: return yield await self.parse_array(skip_start=True) async def parse_array(self, skip_start: bool) -> list: if not skip_start: await self.reader.read_start_array() arr = [] while True: token = await self.reader.read_token_of_type( JsonTokenType.NULL, JsonTokenType.BOOLEAN, JsonTokenType.NUMBER, JsonTokenType.STRING, JsonTokenType.START_MAP, JsonTokenType.START_ARRAY, JsonTokenType.END_ARRAY, ) if token.token_type == JsonTokenType.END_ARRAY: return arr if token.token_type == JsonTokenType.START_MAP: arr.append(await self.parse_object(skip_start=True)) elif token.token_type == JsonTokenType.START_ARRAY: arr.append(await self.parse_array(skip_start=True)) else: arr.append(token.token_value) async def parse_object(self, skip_start: bool) -> Dict[str, Any]: if not skip_start: await self.reader.read_start_object() obj = {} while True: token_prop_name = await self.reader.read_token_of_type(JsonTokenType.MAP_KEY, JsonTokenType.END_MAP) if token_prop_name.token_type == JsonTokenType.END_MAP: return obj prop_name = token_prop_name.token_value token = await self.reader.read_token_of_type( JsonTokenType.NULL, JsonTokenType.BOOLEAN, JsonTokenType.NUMBER, JsonTokenType.STRING, JsonTokenType.START_MAP, JsonTokenType.START_ARRAY ) if token.token_type == JsonTokenType.START_MAP: obj[prop_name] = await self.parse_object(skip_start=True) elif token.token_type == JsonTokenType.START_ARRAY: obj[prop_name] = await self.parse_array(skip_start=True) else: obj[prop_name] = token.token_value async def extract_props(self, frame_type: FrameType, *props: Tuple[str, JsonTokenType]) -> Dict[str, Any]: result = {"FrameType": frame_type} props_dict = dict(props) while props_dict: name = (await self.reader.skip_until_any_property_name(*props_dict.keys())).token_value if props_dict[name] == JsonTokenType.START_ARRAY: result[name] = await self.parse_array(skip_start=False) else: result[name] = (await self.reader.read_token_of_type(props_dict[name])).token_value props_dict.pop(name) return result async def read_frame_type(self) -> FrameType: await self.reader.skip_until_property_name("FrameType") return FrameType[await self.reader.read_string()]