pyignite/queries/response.py (260 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import asyncio from io import SEEK_CUR import attr from collections import OrderedDict import ctypes from pyignite.connection.protocol_context import ProtocolContext from pyignite.constants import RHF_TOPOLOGY_CHANGED, RHF_ERROR from pyignite.datatypes import AnyDataObject, Bool, Int, Long, String, StringArray, Struct from pyignite.datatypes.binary import body_struct, enum_struct, schema_struct from pyignite.queries.op_codes import OP_SUCCESS from pyignite.stream import READ_BACKWARD class StatusFlagResponseHeader(ctypes.LittleEndianStructure): _pack_ = 1 _fields_ = [ ('length', ctypes.c_int), ('query_id', ctypes.c_longlong), ('flags', ctypes.c_short) ] class ResponseHeader(ctypes.LittleEndianStructure): _pack_ = 1 _fields_ = [ ('length', ctypes.c_int), ('query_id', ctypes.c_longlong), ('status_code', ctypes.c_int) ] @attr.s class Response: following = attr.ib(type=list, factory=list) protocol_context = attr.ib(type=type(ProtocolContext), default=None) _response_class_name = 'Response' def __attrs_post_init__(self): # replace None with empty list self.following = self.following or [] def __parse_header(self, stream): init_pos = stream.tell() if self.protocol_context.is_status_flags_supported(): header_class = StatusFlagResponseHeader else: header_class = ResponseHeader header_len = ctypes.sizeof(header_class) header = stream.read_ctype(header_class) stream.seek(header_len, SEEK_CUR) fields = [] has_error = False if self.protocol_context.is_status_flags_supported(): if header.flags & RHF_TOPOLOGY_CHANGED: fields = [ ('affinity_version', ctypes.c_longlong), ('affinity_minor', ctypes.c_int), ] if header.flags & RHF_ERROR: fields.append(('status_code', ctypes.c_int)) has_error = True else: has_error = header.status_code != OP_SUCCESS if fields: stream.seek(sum(ctypes.sizeof(c_type) for _, c_type in fields), SEEK_CUR) if has_error: msg_type = String.parse(stream) fields.append(('error_message', msg_type)) return not has_error, init_pos, header_class, fields def __build_response_class(self, stream, init_pos, header_class, fields): response_class = type( self._response_class_name, (header_class,), { '_pack_': 1, '_fields_': fields, } ) stream.seek(init_pos + ctypes.sizeof(response_class)) return response_class def parse(self, stream): success, init_pos, header_class, fields = self.__parse_header(stream) if success: self._parse_success(stream, fields) return self.__build_response_class(stream, init_pos, header_class, fields) async def parse_async(self, stream): success, init_pos, header_class, fields = self.__parse_header(stream) if success: await self._parse_success_async(stream, fields) return self.__build_response_class(stream, init_pos, header_class, fields) def _parse_success(self, stream, fields: list): for name, ignite_type in self.following: c_type = ignite_type.parse(stream) fields.append((name, c_type)) async def _parse_success_async(self, stream, fields: list): for name, ignite_type in self.following: c_type = await ignite_type.parse_async(stream) fields.append((name, c_type)) def to_python(self, ctypes_object, **kwargs): if not self.following: return None result = OrderedDict() for name, c_type in self.following: result[name] = c_type.to_python(getattr(ctypes_object, name), **kwargs) return result async def to_python_async(self, ctypes_object, **kwargs): if not self.following: return None values = await asyncio.gather( *[c_type.to_python_async(getattr(ctypes_object, name), **kwargs) for name, c_type in self.following] ) return OrderedDict([(name, values[i]) for i, (name, _) in enumerate(self.following)]) @attr.s class SQLResponse(Response): """ The response class of SQL functions is special in the way the row-column data is counted in it. Basically, Ignite thin client API is following a “counter right before the counted objects” rule in most of its parts. SQL ops are breaking this rule. """ include_field_names = attr.ib(type=bool, default=False) has_cursor = attr.ib(type=bool, default=False) _response_class_name = 'SQLResponse' def fields_or_field_count(self): if self.include_field_names: return 'fields', StringArray return 'field_count', Int def _parse_success(self, stream, fields: list): body_struct = self.__create_body_struct() body_class = body_struct.parse(stream) body = stream.read_ctype(body_class, direction=READ_BACKWARD) data_fields, field_count = [], self.__get_fields_count(body) for i in range(body.row_count): row_fields = [] for j in range(field_count): field_class = AnyDataObject.parse(stream) row_fields.append(('column_{}'.format(j), field_class)) self.__row_post_process(i, row_fields, data_fields) self.__body_class_post_process(body_class, fields, data_fields) async def _parse_success_async(self, stream, fields: list): body_struct = self.__create_body_struct() body_class = await body_struct.parse_async(stream) body = stream.read_ctype(body_class, direction=READ_BACKWARD) data_fields, field_count = [], self.__get_fields_count(body) for i in range(body.row_count): row_fields = [] for j in range(field_count): field_class = await AnyDataObject.parse_async(stream) row_fields.append(('column_{}'.format(j), field_class)) self.__row_post_process(i, row_fields, data_fields) self.__body_class_post_process(body_class, fields, data_fields) def __create_body_struct(self): following = [self.fields_or_field_count(), ('row_count', Int)] if self.has_cursor: following.insert(0, ('cursor', Long)) return Struct(following) def __get_fields_count(self, body): if self.include_field_names: return body.fields.length return body.field_count @staticmethod def __row_post_process(idx, row_fields, data_fields): row_class = type( 'SQLResponseRow', (ctypes.LittleEndianStructure,), { '_pack_': 1, '_fields_': row_fields, } ) data_fields.append((f'row_{idx}', row_class)) @staticmethod def __body_class_post_process(body_class, fields, data_fields): data_class = type( 'SQLResponseData', (ctypes.LittleEndianStructure,), { '_pack_': 1, '_fields_': data_fields, } ) fields += body_class._fields_ + [ ('data', data_class), ('more', ctypes.c_byte), ] def to_python(self, ctypes_object, **kwargs): if getattr(ctypes_object, 'status_code', 0) == 0: result = self.__to_python_result_header(ctypes_object, **kwargs) for row_item in ctypes_object.data._fields_: row_name = row_item[0] row_object = getattr(ctypes_object.data, row_name) row = [] for col_item in row_object._fields_: col_name = col_item[0] col_object = getattr(row_object, col_name) row.append(AnyDataObject.to_python(col_object, **kwargs)) result['data'].append(row) return result async def to_python_async(self, ctypes_object, **kwargs): if getattr(ctypes_object, 'status_code', 0) == 0: result = self.__to_python_result_header(ctypes_object, **kwargs) data_coro = [] for row_item in ctypes_object.data._fields_: row_name = row_item[0] row_object = getattr(ctypes_object.data, row_name) row_coro = [] for col_item in row_object._fields_: col_name = col_item[0] col_object = getattr(row_object, col_name) row_coro.append(AnyDataObject.to_python_async(col_object, **kwargs)) data_coro.append(asyncio.gather(*row_coro)) result['data'] = await asyncio.gather(*data_coro) return result @staticmethod def __to_python_result_header(ctypes_object, *args, **kwargs): result = { 'more': Bool.to_python(ctypes_object.more, *args, **kwargs), 'data': [], } if hasattr(ctypes_object, 'fields'): result['fields'] = StringArray.to_python(ctypes_object.fields, *args, **kwargs) else: result['field_count'] = Int.to_python(ctypes_object.field_count, *args, **kwargs) if hasattr(ctypes_object, 'cursor'): result['cursor'] = Long.to_python(ctypes_object.cursor, *args, **kwargs) return result class BinaryTypeResponse(Response): _response_class_name = 'GetBinaryTypeResponse' def _parse_success(self, stream, fields: list): type_exists = self.__process_type_exists(stream, fields) if type_exists.value: resp_body_type = body_struct.parse(stream) fields.append(('body', resp_body_type)) resp_body = stream.read_ctype(resp_body_type, direction=READ_BACKWARD) if resp_body.is_enum: resp_enum = enum_struct.parse(stream) fields.append(('enums', resp_enum)) resp_schema_type = schema_struct.parse(stream) fields.append(('schema', resp_schema_type)) async def _parse_success_async(self, stream, fields: list): type_exists = self.__process_type_exists(stream, fields) if type_exists.value: resp_body_type = await body_struct.parse_async(stream) fields.append(('body', resp_body_type)) resp_body = stream.read_ctype(resp_body_type, direction=READ_BACKWARD) if resp_body.is_enum: resp_enum = await enum_struct.parse_async(stream) fields.append(('enums', resp_enum)) resp_schema_type = await schema_struct.parse_async(stream) fields.append(('schema', resp_schema_type)) @staticmethod def __process_type_exists(stream, fields): fields.append(('type_exists', ctypes.c_byte)) type_exists = stream.read_ctype(ctypes.c_byte) stream.seek(ctypes.sizeof(ctypes.c_byte), SEEK_CUR) return type_exists def to_python(self, ctypes_object, **kwargs): if getattr(ctypes_object, 'status_code', 0) == 0: result = { 'type_exists': Bool.to_python(ctypes_object.type_exists) } if hasattr(ctypes_object, 'body'): result.update(body_struct.to_python(ctypes_object.body)) if hasattr(ctypes_object, 'enums'): result['enums'] = enum_struct.to_python(ctypes_object.enums) if hasattr(ctypes_object, 'schema'): result['schema'] = { x['schema_id']: [ z['schema_field_id'] for z in x['schema_fields'] ] for x in schema_struct.to_python(ctypes_object.schema) } return result async def to_python_async(self, ctypes_object, **kwargs): return self.to_python(ctypes_object, **kwargs)