cupid/runtime/ctypes_libs.py (354 lines of code) (raw):

# Copyright 1999-2022 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import json from ctypes import CDLL, RTLD_GLOBAL, create_string_buffer, \ c_char, c_void_p, c_char_p, c_int32, c_uint32, byref, POINTER from ctypes.util import find_library from io import IOBase from odps.compat import BytesIO, six from odps.utils import to_binary from ..errors import SubprocessStreamEOFError, CupidCppError from ..utils import get_environ _lib_path = find_library('odps_subprocess') if _lib_path is None: ld_paths = (get_environ('LD_LIBRARY_PATH') or "").split(':') for p in ld_paths: so_path = os.path.join(p, 'libodps_subprocess.so') if os.path.exists(so_path): _lib_path = so_path break if _lib_path is None: _lib_path = os.path.join(os.getcwd(), 'libodps_subprocess.so') try: odps_subproc = CDLL(_lib_path, mode=RTLD_GLOBAL) except OSError: raise ImportError('Failed to load libodps_subprocess.so.') Subprocess_Container_Init = odps_subproc.Subprocess_Container_Init Subprocess_StdString_Size = odps_subproc.Subprocess_StdString_Size Subprocess_StdString_Size.argtypes = [c_void_p] Subprocess_StdString_Size.restype = c_uint32 Subprocess_StdString_CopyTo = odps_subproc.Subprocess_StdString_CopyTo Subprocess_StdString_CopyTo.argtypes = [c_char_p, c_void_p, c_uint32] Subprocess_StdString_delete = odps_subproc.Subprocess_StdString_delete Subprocess_StdString_delete.argtypes = [c_void_p] Subprocess_StartFDReceiver = odps_subproc.Subprocess_StartFDReceiver Subprocess_StdIStream_eof = odps_subproc.Subprocess_StdIStream_eof Subprocess_StdIStream_eof.argtypes = [c_void_p] Subprocess_StdIStream_eof.restype = c_int32 Subprocess_StdIStream_read = odps_subproc.Subprocess_StdIStream_read Subprocess_StdIStream_read.argtypes = [c_void_p, c_char_p, c_uint32] Subprocess_StdIStream_read.restype = c_uint32 Subprocess_StdIStream_seekg = odps_subproc.Subprocess_StdIStream_seekg Subprocess_StdIStream_seekg.argtypes = [c_void_p, c_int32, c_int32] Subprocess_StdIStream_tellg = odps_subproc.Subprocess_StdIStream_tellg Subprocess_StdIStream_tellg.argtypes = [c_void_p] Subprocess_StdIStream_tellg.restype = c_uint32 Subprocess_StdIStream_getline = odps_subproc.Subprocess_StdIStream_getline Subprocess_StdIStream_getline.argtypes = [c_void_p] Subprocess_StdIStream_getline.restype = c_void_p Subprocess_StdIStream_delete = odps_subproc.Subprocess_StdIStream_delete Subprocess_StdIStream_delete.argtypes = [c_void_p] Subprocess_StdOStream_write = odps_subproc.Subprocess_StdOStream_write Subprocess_StdOStream_write.argtypes = [c_void_p, c_char_p, c_uint32] Subprocess_StdOStream_flush = odps_subproc.Subprocess_StdOStream_flush Subprocess_StdOStream_flush.argtypes = [c_void_p] Subprocess_StdOStream_delete = odps_subproc.Subprocess_StdOStream_delete Subprocess_StdOStream_delete.argtypes = [c_void_p] ChannelConf_new = odps_subproc.ChannelConf_new ChannelConf_new.restype = c_void_p ChannelConf_delete = odps_subproc.ChannelConf_delete ChannelConf_delete.argtypes = [c_void_p] ChannelConf_SetInt = odps_subproc.ChannelConf_SetInt ChannelConf_SetInt.argtypes = [c_void_p, c_char_p, c_int32] ChannelResultInputStreamPtr_GetResult = odps_subproc.ChannelResultInputStreamPtr_GetResult ChannelResultInputStreamPtr_GetResult.argtypes = [c_void_p, c_int32, POINTER(c_void_p)] ChannelResultInputStreamPtr_GetResult.restype = c_void_p ChannelResultInputStreamPtr_GetStream = odps_subproc.ChannelResultInputStreamPtr_GetStream ChannelResultInputStreamPtr_GetStream.argtypes = [c_void_p] ChannelResultInputStreamPtr_GetStream.restype = c_void_p ChannelResultInputStreamPtr_GetFD = odps_subproc.ChannelResultInputStreamPtr_GetFD ChannelResultInputStreamPtr_GetFD.argtypes = [c_void_p] ChannelResultInputStreamPtr_GetFD.restype = c_int32 ChannelResultInputStreamPtr_delete = odps_subproc.ChannelResultInputStreamPtr_delete ChannelResultInputStreamPtr_delete.argtypes = [c_void_p] ChannelResultOutputStreamPtr_GetResult = odps_subproc.ChannelResultOutputStreamPtr_GetResult ChannelResultOutputStreamPtr_GetResult.argtypes = [c_void_p, c_int32, POINTER(c_void_p)] ChannelResultOutputStreamPtr_GetResult.restype = c_void_p ChannelResultOutputStreamPtr_GetStream = odps_subproc.ChannelResultOutputStreamPtr_GetStream ChannelResultOutputStreamPtr_GetStream.argtypes = [c_void_p] ChannelResultOutputStreamPtr_GetStream.restype = c_void_p ChannelResultOutputStreamPtr_GetFD = odps_subproc.ChannelResultOutputStreamPtr_GetFD ChannelResultOutputStreamPtr_GetFD.argtypes = [c_void_p] ChannelResultOutputStreamPtr_GetFD.restype = c_int32 ChannelResultOutputStreamPtr_delete = odps_subproc.ChannelResultOutputStreamPtr_delete ChannelResultOutputStreamPtr_delete.argtypes = [c_void_p] ChannelSlaveClient_new = odps_subproc.ChannelSlaveClient_new ChannelSlaveClient_new.argtypes = [c_int32, c_int32, c_char_p] ChannelSlaveClient_new.restype = c_void_p ChannelSlaveClient_delete = odps_subproc.ChannelSlaveClient_delete ChannelSlaveClient_delete.argtypes = [c_void_p] ChannelSlaveClient_Start = odps_subproc.ChannelSlaveClient_Start ChannelSlaveClient_Start.argtypes = [c_void_p] ChannelSlaveClient_Stop = odps_subproc.ChannelSlaveClient_Stop ChannelSlaveClient_Stop.argtypes = [c_void_p] ChannelSlaveClient_SyncCall = odps_subproc.ChannelSlaveClient_SyncCall ChannelSlaveClient_SyncCall.argtypes = [c_void_p, c_char_p, c_char_p, c_uint32, c_int32, POINTER(c_void_p)] ChannelSlaveClient_SyncCall.restype = c_void_p ChannelSlaveClient_CreateInputStream = odps_subproc.ChannelSlaveClient_CreateInputStream ChannelSlaveClient_CreateInputStream.argtypes = [c_void_p, c_char_p, c_char_p, c_uint32, POINTER(c_void_p)] ChannelSlaveClient_CreateInputStream.restype = c_void_p ChannelSlaveClient_CreateOutputStream = odps_subproc.ChannelSlaveClient_CreateOutputStream ChannelSlaveClient_CreateOutputStream.argtypes = [c_void_p, c_char_p, c_char_p, c_uint32, POINTER(c_void_p)] ChannelSlaveClient_CreateOutputStream.restype = c_void_p ChannelApiError_GetErrorType = odps_subproc.ChannelApiError_GetErrorType ChannelApiError_GetErrorType.argtypes = [c_void_p] ChannelApiError_GetErrorType.restype = c_void_p ChannelApiError_GetErrorMessage = odps_subproc.ChannelApiError_GetErrorMessage ChannelApiError_GetErrorMessage.argtypes = [c_void_p] ChannelApiError_GetErrorMessage.restype = c_void_p ChannelApiError_delete = odps_subproc.ChannelApiError_delete ChannelApiError_delete.argtypes = [c_void_p] READ_CHUNK = 2048 def _read_std_string_ptr(std_string): std_string_len = Subprocess_StdString_Size(std_string) p = create_string_buffer(std_string_len) Subprocess_StdString_CopyTo(p, std_string, std_string_len) Subprocess_StdString_delete(std_string) return p def _call_with_raise(func, *args): err_pp = c_void_p() err_ptr = 0 try: args = args + (byref(err_pp), ) res = func(*args) if res: return res else: err_ptr = err_pp.value err_type_ptr = ChannelApiError_GetErrorType(err_ptr) err_type = _read_std_string_ptr(err_type_ptr).raw err_message_ptr = ChannelApiError_GetErrorMessage(err_ptr) err_message = _read_std_string_ptr(err_message_ptr).raw raise CupidCppError(err_type, err_message) finally: if err_ptr: ChannelApiError_delete(err_ptr) class IStreamWrapper(IOBase): def __init__(self, ptr, free=True): self._ptr = ptr self._free = free def __del__(self): self.close() def close(self): if self._free and self._ptr is not None: Subprocess_StdIStream_delete(self._ptr) self._ptr = None @property def closed(self): if self._ptr is None: return True else: return False def _reset(self): self._ptr = None def readable(self): return self._ptr is not None def read(self, size=-1): if Subprocess_StdIStream_eof(self._ptr): raise SubprocessStreamEOFError('Subprocess stream exhausted') if size < 0: buf_io = BytesIO() while True: try: chunk = self.read(READ_CHUNK) except SubprocessStreamEOFError: chunk = b'' buf_io.write(chunk) if len(chunk) < READ_CHUNK: break return buf_io.getvalue() else: p = create_string_buffer(size) Subprocess_StdIStream_read(self._ptr, p, size) return p.raw def readinto(self, b, offset=0): if Subprocess_StdIStream_eof(self._ptr): raise SubprocessStreamEOFError('Subprocess stream exhausted') size = len(b) - offset array_cls = c_char * size p = array_cls.from_buffer(b, offset) ret_size = Subprocess_StdIStream_read(self._ptr, p, size) if ret_size == 0 and Subprocess_StdIStream_eof(self._ptr): raise SubprocessStreamEOFError('Subprocess stream exhausted') return ret_size def readline(self, size=-1): str_ptr = Subprocess_StdIStream_getline(self._ptr, size) p = _read_std_string_ptr(str_ptr) return p.raw def readlines(self, hint=-1): lines = [] while True: line = self.readline(hint) lines.append(line) if hint == 0 or Subprocess_StdIStream_eof(self._ptr): break if hint > 0: hint -= len(line) return lines def __iter__(self): while True: line = self.readline() yield line if Subprocess_StdIStream_eof(self._ptr): break def seek(self, offset, whence=0): Subprocess_StdIStream_seekg(self._ptr, offset, whence) def tell(self): return Subprocess_StdIStream_tellg(self._ptr) class OStreamWrapper(IOBase): def __init__(self, ptr, free=True): self._ptr = ptr self._free = free def __del__(self): if self._ptr is not None: self.flush() self.close() def close(self): if self._free and self._ptr is not None: Subprocess_StdOStream_delete(self._ptr) self._ptr = None @property def closed(self): if self._ptr is None: return True else: return False def _reset(self): self._ptr = None def writable(self): return self._ptr is not None def write(self, data, length=None): length = length or len(data) atype = c_char * length Subprocess_StdOStream_write(self._ptr, atype.from_buffer_copy(data), length) def flush(self): if self._ptr is not None: Subprocess_StdOStream_flush(self._ptr) class ChannelResultInputStreamWrapper(IStreamWrapper): def __init__(self, ptr): self._result_ptr = ptr is_ptr = ChannelResultInputStreamPtr_GetStream(ptr) super(ChannelResultInputStreamWrapper, self).__init__(is_ptr, False) def __del__(self): self.close() def fileno(self): return ChannelResultInputStreamPtr_GetFD(self._result_ptr) def result(self, timeout=-1): # Stream object already freed in C end self._reset() pstr = _call_with_raise(ChannelResultInputStreamPtr_GetResult, self._result_ptr, timeout) p = _read_std_string_ptr(pstr) return p.raw def close(self): if self._result_ptr is not None: ChannelResultInputStreamPtr_delete(self._result_ptr) self._result_ptr = None super(ChannelResultInputStreamWrapper, self).close() class ChannelResultOutputStreamWrapper(OStreamWrapper): def __init__(self, ptr): self._result_ptr = ptr os_ptr = ChannelResultOutputStreamPtr_GetStream(ptr) super(ChannelResultOutputStreamWrapper, self).__init__(os_ptr, False) def __del__(self): self.close() def fileno(self): return ChannelResultOutputStreamPtr_GetFD(self._result_ptr) def result(self, timeout=-1): # Stream object already freed in C end self._reset() pstr = _call_with_raise(ChannelResultOutputStreamPtr_GetResult, self._result_ptr, timeout) p = _read_std_string_ptr(pstr) return p.raw def close(self): if self._result_ptr is not None: self.flush() ChannelResultOutputStreamPtr_delete(self._result_ptr) self._result_ptr = None super(ChannelResultOutputStreamWrapper, self).close() class ChannelOutputWriter(object): def __init__(self, stream): self._stream = stream def write(self, data): self._stream.write(data) def close(self): # sync self._stream.result() self._stream.close() @property def closed(self): return self._stream.closed def flush(self): self._stream.flush() def __enter__(self): return self def __exit__(self, *_): self.close() class ChannelConf(object): def __init__(self): self._ptr = ChannelConf_new() def __del__(self): if self._ptr is not None: ChannelConf_delete(self._ptr) self._ptr = None def set_integer(self, key, value): if isinstance(key, six.string_types): key = key.encode() ChannelConf_SetInt(self._ptr, key, value) class ChannelSlaveClient(object): def __init__(self, request_channel_nums, response_channel_nums, client_name): if isinstance(client_name, six.string_types): client_name = client_name.encode() self._ptr = ChannelSlaveClient_new(request_channel_nums, response_channel_nums, client_name) def __del__(self): if self._ptr is not None: ChannelSlaveClient_delete(self._ptr) self._ptr = None def start(self): ChannelSlaveClient_Start(self._ptr) def stop(self): ChannelSlaveClient_Stop(self._ptr) def sync_call(self, method, parameter, timeout=-1): method = to_binary(method) parameter = to_binary(parameter) result_ptr = _call_with_raise(ChannelSlaveClient_SyncCall, self._ptr, method, parameter, len(parameter), timeout) p = _read_std_string_ptr(result_ptr) return p.raw def create_file_reader(self, method, params): if isinstance(method, six.string_types): method = method.encode() if isinstance(params, six.string_types): params = params.encode() ptr = _call_with_raise(ChannelSlaveClient_CreateInputStream, self._ptr, method, params, len(params)) return ChannelResultInputStreamWrapper(ptr) def create_file_writer(self, method, params): if isinstance(method, six.string_types): method = method.encode() if isinstance(params, six.string_types): params = params.encode() ptr = _call_with_raise(ChannelSlaveClient_CreateOutputStream, self._ptr, method, params, len(params)) return ChannelResultOutputStreamWrapper(ptr) def create_record_reader(self, label, schema, columns=None): from ..io.table import CupidRecordReader params = dict(type='ReadByLabel', label=label, arrow=False, batch=False) stream = self.create_file_reader('createTableInputStream', json.dumps(params).encode()) reader = CupidRecordReader(schema, stream, columns=columns) return reader def create_record_writer(self, label, schema): from ..io.table import CupidRecordWriter params = json.dumps(dict(type='WriteByLabel', label=label, arrow=False, Batch=False)) stream = self.create_file_writer('createTableOutputStream', params.encode()) writer = CupidRecordWriter(schema, stream) return writer def create_arrow_writer(self, label): params = json.dumps(dict(type='WriteByLabel', label=label, arrow=True, batch=True)) stream = self.create_file_writer('createTableOutputStream', params.encode()) return ChannelOutputWriter(stream) create_table_reader = create_record_reader create_table_writer = create_record_writer