python-client/pypegasus/operate/packet.py (197 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import struct import ctypes from thrift.Thrift import TMessageType from pypegasus import utils from pypegasus.rrdb import ( meta, rrdb) from pypegasus.base.ttypes import ( rocksdb_error_types, error_code, gpid) from pypegasus.utils import tools class ThriftHeader(object): HEADER_LENGTH = 48 HEADER_TYPE = b'THFT' def __init__(self, gpid, partition_hash=0): self.hdr_version = 0 self.header_length = self.HEADER_LENGTH self.header_crc32 = 0 self.body_length = 0 self.body_crc32 = 0 self.app_id = gpid.get_app_id() self.partition_index = gpid.get_pidx() self.client_timeout = 0 self.thread_hash = 0 self.partition_hash = partition_hash def to_bytes(self): v = (self.HEADER_TYPE, self.hdr_version, self.header_length, self.header_crc32, self.body_length, self.body_crc32, self.app_id, self.partition_index, self.client_timeout, self.thread_hash, self.partition_hash) s = struct.Struct('>4siiiiiiiiiq') buff = ctypes.create_string_buffer(s.size) s.pack_into(buff, 0, *v) return buff class ClientOperator(object): def __init__(self, gpid=gpid(), request=None, partition_hash=0): self.pid = gpid self.header = ThriftHeader(gpid, partition_hash) self.error_code = error_code() self.request = request self.response = None def prepare_thrift_header(self, body_length): self.header.body_length = body_length self.header.thread_hash = tools.dsn_gpid_to_thread_hash(self.header.app_id, self.header.partition_index) return self.header.to_bytes() @staticmethod def parse_result(resp): return resp.error class QueryCfgOperator(ClientOperator): def __init__(self, gpid, request, partition_hash): ClientOperator.__init__(self, gpid, request, partition_hash) def send_data(self, oprot, seqid): oprot.writeMessageBegin("RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX", TMessageType.CALL, seqid) args = meta.query_cfg_args(self.request) args.write(oprot) oprot.writeMessageEnd() @staticmethod def parse_result(resp): return resp class RrdbTtlOperator(ClientOperator): def __init__(self, gpid, request, partition_hash): ClientOperator.__init__(self, gpid, request, partition_hash) def send_data(self, oprot, seqid): oprot.writeMessageBegin("RPC_RRDB_RRDB_TTL", TMessageType.CALL, seqid) args = rrdb.get_args(self.request) args.write(oprot) oprot.writeMessageEnd() @staticmethod def parse_result(resp): resp.error = utils.tools.convert_error_type(resp.error) return resp.error, resp.ttl_seconds class RrdbGetOperator(ClientOperator): def __init__(self, gpid, request, partition_hash): ClientOperator.__init__(self, gpid, request, partition_hash) def send_data(self, oprot, seqid): oprot.writeMessageBegin("RPC_RRDB_RRDB_GET", TMessageType.CALL, seqid) args = rrdb.get_args(self.request) args.write(oprot) oprot.writeMessageEnd() @staticmethod def parse_result(resp): resp.error = utils.tools.convert_error_type(resp.error) return resp.error, resp.value.data class RrdbMultiGetOperator(ClientOperator): def __init__(self, gpid, request, partition_hash): ClientOperator.__init__(self, gpid, request, partition_hash) def send_data(self, oprot, seqid): oprot.writeMessageBegin("RPC_RRDB_RRDB_MULTI_GET", TMessageType.CALL, seqid) args = rrdb.multi_get_args(self.request) args.write(oprot) oprot.writeMessageEnd() @staticmethod def parse_result(resp): data = {} if resp.error == rocksdb_error_types.kOk.value\ or resp.error == rocksdb_error_types.kIncomplete.value: for kv in resp.kvs: data[kv.key.data] = kv.value.data resp.error = utils.tools.convert_error_type(resp.error) return resp.error, data class RrdbPutOperator(ClientOperator): def __init__(self, gpid, request, partition_hash): ClientOperator.__init__(self, gpid, request, partition_hash) def send_data(self, oprot, seqid): oprot.writeMessageBegin("RPC_RRDB_RRDB_PUT", TMessageType.CALL, seqid) args = rrdb.put_args(self.request) args.write(oprot) oprot.writeMessageEnd() @staticmethod def parse_result(resp): return resp.error, None class RrdbMultiPutOperator(ClientOperator): def __init__(self, gpid, request, partition_hash): ClientOperator.__init__(self, gpid, request, partition_hash) def send_data(self, oprot, seqid): oprot.writeMessageBegin("RPC_RRDB_RRDB_MULTI_PUT", TMessageType.CALL, seqid) args = rrdb.multi_put_args(self.request) args.write(oprot) oprot.writeMessageEnd() @staticmethod def parse_result(resp): return resp.error, None class RrdbRemoveOperator(ClientOperator): def __init__(self, gpid, request, partition_hash): ClientOperator.__init__(self, gpid, request, partition_hash) def send_data(self, oprot, seqid): oprot.writeMessageBegin("RPC_RRDB_RRDB_REMOVE", TMessageType.CALL, seqid) args = rrdb.remove_args(self.request) args.write(oprot) oprot.writeMessageEnd() @staticmethod def parse_result(resp): return resp.error, None class RrdbMultiRemoveOperator(ClientOperator): def __init__(self, gpid, request, partition_hash): ClientOperator.__init__(self, gpid, request, partition_hash) def send_data(self, oprot, seqid): oprot.writeMessageBegin("RPC_RRDB_RRDB_MULTI_REMOVE", TMessageType.CALL, seqid) args = rrdb.multi_remove_args(self.request) args.write(oprot) oprot.writeMessageEnd() @staticmethod def parse_result(resp): return resp.error, resp.count class RrdbSortkeyCountOperator(ClientOperator): def __init__(self, gpid, request, partition_hash): ClientOperator.__init__(self, gpid, request, partition_hash) def send_data(self, oprot, seqid): oprot.writeMessageBegin("RPC_RRDB_RRDB_SORTKEY_COUNT", TMessageType.CALL, seqid) args = rrdb.sortkey_count_args(self.request) args.write(oprot) oprot.writeMessageEnd() @staticmethod def parse_result(resp): return resp.error, resp.count class RrdbGetScannerOperator(ClientOperator): def __init__(self, gpid, request, partition_hash): ClientOperator.__init__(self, gpid, request, partition_hash) def send_data(self, oprot, seqid): oprot.writeMessageBegin("RPC_RRDB_RRDB_GET_SCANNER", TMessageType.CALL, seqid) args = rrdb.get_scanner_args(self.request) args.write(oprot) oprot.writeMessageEnd() @staticmethod def parse_result(resp): return {'error': resp.error, 'context_id': resp.context_id, 'kvs': resp.kvs} class RrdbScanOperator(ClientOperator): def __init__(self, gpid, request, partition_hash): ClientOperator.__init__(self, gpid, request, partition_hash) def send_data(self, oprot, seqid): oprot.writeMessageBegin("RPC_RRDB_RRDB_SCAN", TMessageType.CALL, seqid) args = rrdb.scan_args(self.request) args.write(oprot) oprot.writeMessageEnd() @staticmethod def parse_result(resp): return {'error': resp.error, 'context_id': resp.context_id, 'kvs': resp.kvs} class RrdbClearScannerOperator(ClientOperator): def __init__(self, gpid, request, partition_hash): ClientOperator.__init__(self, gpid, request, partition_hash) def send_data(self, oprot, seqid): oprot.writeMessageBegin("RPC_RRDB_RRDB_CLEAR_SCANNER", TMessageType.CALL, seqid) args = rrdb.clear_scanner_args(self.request) args.write(oprot) oprot.writeMessageEnd()