pyignite/queries/query.py (205 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import ctypes import inspect import logging import time from io import SEEK_CUR import attr from pyignite.api.result import APIResult from pyignite.connection import Connection, AioConnection from pyignite.constants import MAX_LONG, RHF_TOPOLOGY_CHANGED from pyignite.queries import op_codes from pyignite.queries.response import Response from pyignite.stream import AioBinaryStream, BinaryStream, READ_BACKWARD logger = logging.getLogger('.'.join(__name__.split('.')[:-1])) def query_perform(query_struct, conn, post_process_fun=None, **kwargs): async def _async_internal(): result = await query_struct.perform_async(conn, **kwargs) if post_process_fun: return post_process_fun(result) return result def _internal(): result = query_struct.perform(conn, **kwargs) if post_process_fun: return post_process_fun(result) return result if isinstance(conn, AioConnection): return _async_internal() return _internal() _QUERY_COUNTER = 0 def _get_query_id(): global _QUERY_COUNTER if _QUERY_COUNTER >= MAX_LONG: return 0 _QUERY_COUNTER += 1 return _QUERY_COUNTER _OP_CODES = {code: name for name, code in inspect.getmembers(op_codes) if name.startswith('OP_')} def _get_op_code_name(code): global _OP_CODES return _OP_CODES.get(code) def _sec_to_millis(secs): return int(secs * 1000) @attr.s class Query: op_code = attr.ib(type=int) following = attr.ib(type=list, factory=list) query_id = attr.ib(type=int) response_type = attr.ib(type=type(Response), default=Response) _query_c_type = None _start_ts = 0.0 @query_id.default def _set_query_id(self): return _get_query_id() @classmethod def build_c_type(cls): if cls._query_c_type is None: cls._query_c_type = type( cls.__name__, (ctypes.LittleEndianStructure,), { '_pack_': 1, '_fields_': [ ('length', ctypes.c_int), ('op_code', ctypes.c_short), ('query_id', ctypes.c_longlong), ], }, ) return cls._query_c_type def from_python(self, stream, values: dict = None): init_pos, header = stream.tell(), self._build_header(stream) values = values if values else None for name, c_type in self.following: c_type.from_python(stream, values[name]) self.__write_header(stream, header, init_pos) async def from_python_async(self, stream, values: dict = None): init_pos, header = stream.tell(), self._build_header(stream) values = values if values else None for name, c_type in self.following: await c_type.from_python_async(stream, values[name]) self.__write_header(stream, header, init_pos) def _build_header(self, stream): global _QUERY_COUNTER header_class = self.build_c_type() header_len = ctypes.sizeof(header_class) stream.seek(header_len, SEEK_CUR) header = header_class() header.op_code = self.op_code header.query_id = self.query_id return header @staticmethod def __write_header(stream, header, init_pos): header.length = stream.tell() - init_pos - ctypes.sizeof(ctypes.c_int) stream.seek(init_pos) stream.write(header) def perform( self, conn: Connection, query_params: dict = None, response_config: list = None, **kwargs, ) -> APIResult: """ Perform query and process result. :param conn: connection to Ignite server, :param query_params: (optional) dict of named query parameters. Defaults to no parameters, :param response_config: (optional) response configuration − list of (name, type_hint) tuples. Defaults to empty return value, :return: instance of :class:`~pyignite.api.result.APIResult` with raw value (may undergo further processing in API functions). """ try: self._on_query_started(conn) with BinaryStream(conn.client) as stream: self.from_python(stream, query_params) response_data = conn.request(stream.getvalue()) response_struct = self.response_type(protocol_context=conn.protocol_context, following=response_config, **kwargs) with BinaryStream(conn.client, response_data) as stream: response_ctype = response_struct.parse(stream) response = stream.read_ctype(response_ctype, direction=READ_BACKWARD) result = self.__post_process_response(conn, response_struct, response) if result.status == 0: result.value = response_struct.to_python(response) self._on_query_finished(conn, result=result) return result except Exception as e: self._on_query_finished(conn, err=e) raise e async def perform_async( self, conn: AioConnection, query_params: dict = None, response_config: list = None, **kwargs, ) -> APIResult: """ Perform query and process result. :param conn: connection to Ignite server, :param query_params: (optional) dict of named query parameters. Defaults to no parameters, :param response_config: (optional) response configuration − list of (name, type_hint) tuples. Defaults to empty return value, :return: instance of :class:`~pyignite.api.result.APIResult` with raw value (may undergo further processing in API functions). """ try: self._on_query_started(conn) with AioBinaryStream(conn.client) as stream: await self.from_python_async(stream, query_params) data = await conn.request(self.query_id, stream.getvalue()) response_struct = self.response_type(protocol_context=conn.protocol_context, following=response_config, **kwargs) with AioBinaryStream(conn.client, data) as stream: response_ctype = await response_struct.parse_async(stream) response = stream.read_ctype(response_ctype, direction=READ_BACKWARD) result = self.__post_process_response(conn, response_struct, response) if result.status == 0: result.value = await response_struct.to_python_async(response) self._on_query_finished(conn, result=result) return result except Exception as e: self._on_query_finished(conn, err=e) raise e @staticmethod def __post_process_response(conn, response_struct, response): if getattr(response, 'flags', False) & RHF_TOPOLOGY_CHANGED: # update latest affinity version new_affinity = (response.affinity_version, response.affinity_minor) old_affinity = conn.client.affinity_version if new_affinity > old_affinity: conn.client.affinity_version = new_affinity # build result return APIResult(response) @staticmethod def _enabled_query_listener(conn): client = conn.client return client._event_listeners and client._event_listeners.enabled_query_listener @staticmethod def _event_listener(conn): return conn.client._event_listeners def _on_query_started(self, conn): self._start_ts = time.monotonic() if logger.isEnabledFor(logging.DEBUG): logger.debug("Start query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s)", self.query_id, _get_op_code_name(self.op_code), conn.host, conn.port, conn.uuid) if self._enabled_query_listener(conn): self._event_listener(conn).publish_query_start(conn.host, conn.port, conn.uuid, self.query_id, self.op_code, _get_op_code_name(self.op_code)) def _on_query_finished(self, conn, result=None, err=None): dur_ms = _sec_to_millis(time.monotonic() - self._start_ts) if result and result.status != 0: err = result.message if err: if logger.isEnabledFor(logging.DEBUG): logger.debug("Failed to perform query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s) " "in %d ms: %s", self.query_id, _get_op_code_name(self.op_code), conn.host, conn.port, conn.uuid, dur_ms, err) if self._enabled_query_listener(conn): self._event_listener(conn).publish_query_fail(conn.host, conn.port, conn.uuid, self.query_id, self.op_code, _get_op_code_name(self.op_code), dur_ms, err) else: if logger.isEnabledFor(logging.DEBUG): logger.debug("Finished query(query_id=%d, op_type=%s, host=%s, port=%d, node_id=%s) " "successfully in %d ms", self.query_id, _get_op_code_name(self.op_code), conn.host, conn.port, conn.uuid, dur_ms) if self._enabled_query_listener(conn): self._event_listener(conn).publish_query_success(conn.host, conn.port, conn.uuid, self.query_id, self.op_code, _get_op_code_name(self.op_code), dur_ms) class ConfigQuery(Query): """ This is a special query, used for creating caches with configuration. """ _query_c_type = None @classmethod def build_c_type(cls): if cls._query_c_type is None: cls._query_c_type = type( cls.__name__, (ctypes.LittleEndianStructure,), { '_pack_': 1, '_fields_': [ ('length', ctypes.c_int), ('op_code', ctypes.c_short), ('query_id', ctypes.c_longlong), ('config_length', ctypes.c_int), ], }, ) return cls._query_c_type def _build_header(self, stream): header = super()._build_header(stream) header.config_length = header.length - ctypes.sizeof(type(header)) return header