python/pyfury/format/row.pxi (312 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from libcpp.memory cimport make_shared
from libc.stdint cimport *
from cython.operator cimport dereference as deref
from datetime import timedelta
from pyfury.includes.libformat cimport CGetter, CArrayData, CMapData, CRow
from pyfury._util cimport Buffer
from libcpp.memory cimport shared_ptr
from datetime import datetime, date
from libc.stdint cimport *
from libcpp cimport bool as c_bool
from cpython cimport *
from pyarrow.lib cimport Schema, DataType, ListType, MapType, Field
import pyarrow as pa
from pyarrow import types
cdef dict reader_map = {}
cdef class Getter:
cdef:
CGetter* getter
cdef inline c_bool is_null_at(self, int i):
return self.getter.IsNullAt(i)
cpdef get_boolean(self, int i):
if self.is_null_at(i):
return None
return self.getter.GetBoolean(i)
cpdef get_int8(self, int i):
if self.is_null_at(i):
return None
return self.getter.GetInt8(i)
cpdef get_int16(self, int i):
if self.is_null_at(i):
return None
return self.getter.GetInt16(i)
cpdef get_int32(self, int i):
if self.is_null_at(i):
return None
return self.getter.GetInt32(i)
cpdef get_int64(self, int i):
if self.is_null_at(i):
return None
return self.getter.GetInt64(i)
cpdef get_float(self, int i):
if self.is_null_at(i):
return None
return self.getter.GetFloat(i)
cpdef get_double(self, int i):
if self.is_null_at(i):
return None
return self.getter.GetDouble(i)
cpdef get_date(self, int i):
if self.is_null_at(i):
return None
cdef int32_t days = self.getter.GetInt32(i)
return date(1970, 1, 1) + timedelta(days=days)
cpdef get_datetime(self, int i):
if self.is_null_at(i):
return None
cdef int64_t timestamp = self.getter.GetInt64(i)
# TimestampType represent micro seconds
return datetime.fromtimestamp(float(timestamp) / 1000000)
cpdef get_binary(self, int i):
if self.is_null_at(i):
return None
cdef unsigned char* binary_data
cdef int32_t size = self.getter.GetBinary(i, &binary_data)
return binary_data[:size]
cpdef get_str(self, int i):
if self.is_null_at(i):
return None
cdef unsigned char* binary_data
cdef int32_t size = self.getter.GetBinary(i, &binary_data)
return binary_data[:size].decode("UTF-8")
cpdef RowData get_struct(self, int i):
pass
cpdef ArrayData get_array_data(self, int i):
pass
cpdef MapData get_map_data(self, int i):
pass
cdef class ArrayData(Getter):
cdef:
ListType type_
shared_ptr[CArrayData] data
def __init__(self):
raise TypeError("Do not call constructor directly, use "
"factory function instead.")
@staticmethod
cdef ArrayData wrap(shared_ptr[CArrayData] data, ListType array_type):
cdef ArrayData array_data = ArrayData.__new__(ArrayData)
array_data.getter = data.get()
array_data.data = data
array_data.type_ = array_type
return array_data
@property
def num_elements(self) -> int:
return self.data.get().num_elements()
def buffer(self) -> Buffer:
return Buffer.wrap(self.data.get().buffer())
def base_offset(self) -> int:
return self.data.get().base_offset()
def size_bytes(self) -> int:
return self.data.get().size_bytes()
cpdef RowData get_struct(self, int i):
cdef DataType data_type = self.type_.value_type
# assert_type(i, data_type, StructType)
if self.is_null_at(i):
return None
return RowData.wrap(self.data.get().GetStruct(i), pa.schema(data_type))
cpdef ArrayData get_array_data(self, int i):
cdef DataType data_type = self.type_.value_type
if self.is_null_at(i):
return None
return ArrayData.wrap(self.data.get().GetArray(i), data_type)
cpdef MapData get_map_data(self, int i):
cdef DataType data_type = self.type_.value_type
if self.is_null_at(i):
return None
cdef shared_ptr[CMapData] v = self.data.get().GetMap(i)
return MapData.wrap(v, data_type)
def __getitem__(self, i):
if i > self.num_elements or i < 0:
raise IndexError("length is {}, but index is {}"
.format(self.num_elements, i))
return self.get(i)
def get(self, int i):
key = id(self.type_.value_type)
reader = reader_map.get(key)
if reader is None:
reader = get_reader(self.type_.value_type, type(self))
reader_map[key] = reader
if self.is_null_at(i):
return None
else:
# cdef methods don't bind self.
return reader(self, i)
def __dealloc__(self):
reader_map.pop(id(self.type_.value_type), None)
def __str__(self) -> str:
cdef:
int length = self.num_elements
int i
str result = "["
getter = get_reader(self.type_.value_type, type(self))
for i in range(length):
if i != 0:
result += ','
if self.is_null_at(i):
result += "null"
else:
result += str(getter(self, i))
result += ']'
return result
cdef class MapData:
cdef:
shared_ptr[CMapData] data
MapType map_type
def __init__(self):
raise TypeError("Do not call constructor directly, use "
"factory function instead.")
@staticmethod
cdef MapData wrap(shared_ptr[CMapData] data, MapType map_type):
cdef MapData map_data = MapData.__new__(MapData)
map_data.data = data
map_data.map_type = map_type
return map_data
@property
def num_elements(self) -> int:
return self.data.get().num_elements()
def buffer(self) -> Buffer:
return Buffer.wrap(self.data.get().buffer())
def base_offset(self) -> int:
return self.data.get().base_offset()
def size_bytes(self) -> int:
return self.data.get().size_bytes()
def keys_array(self):
array_type = pa.list_(self.map_type.key_type)
return ArrayData.wrap(self.data.get().keys_array(), array_type)
def values_array(self):
array_type = pa.list_(self.map_type.item_type)
return ArrayData.wrap(self.data.get().values_array(), array_type)
cdef keys_array_(self, DataType array_type):
return ArrayData.wrap(self.data.get().keys_array(), array_type)
cdef values_array_(self, DataType array_type):
return ArrayData.wrap(self.data.get().values_array(), array_type)
def __str__(self):
return 'Map{' + str(self.keys_array()) + ', ' + str(self.values_array()) + '}'
cdef class RowData(Getter):
cdef:
shared_ptr[CRow] data
Schema schema
Buffer _buf # hold buffer reference
def __init__(self, schema, buffer, offset=0, size_in_bytes=None):
if size_in_bytes is None:
size_in_bytes = len(buffer)
if type(buffer) is not Buffer:
buffer = Buffer(buffer, offset=offset, length=size_in_bytes)
self._buf = buffer
cdef:
Buffer buf = <Buffer>buffer
shared_ptr[CRow] row = make_shared[CRow]((<Schema>schema).sp_schema)
deref(row).PointTo(buf.c_buffer, offset, size_in_bytes)
self.data = row
self.getter = row.get()
self.schema = schema
@staticmethod
cdef RowData wrap(shared_ptr[CRow] data, Schema schema):
cdef RowData row_data = RowData.__new__(RowData)
row_data.data = data
row_data.getter = data.get()
row_data.schema = schema
return row_data
@property
def num_fields(self) -> int:
return self.data.get().num_fields()
def buffer(self) -> Buffer:
return Buffer.wrap(self.data.get().buffer())
cpdef base_offset(self):
return self.data.get().base_offset()
cpdef size_bytes(self):
return self.data.get().size_bytes()
def to_bytes(self) -> bytes:
end_offset = self.base_offset() + self.size_bytes()
return self.buffer().to_bytes()[self.base_offset():end_offset]
cpdef RowData get_struct(self, int i):
if self.is_null_at(i):
return None
cdef DataType data_type = self.schema.field(i).type
# assert_type(i, self.schema.field(i).type, StructType)
return RowData.wrap(self.data.get().GetStruct(i), pa.schema(data_type))
cpdef ArrayData get_array_data(self, int i):
if self.is_null_at(i):
return None
cdef DataType data_type = self.schema.field(i).type
return ArrayData.wrap(self.data.get().GetArray(i), data_type)
cpdef MapData get_map_data(self, int i):
if self.is_null_at(i):
return None
cdef DataType data_type = self.schema.field(i).type
cdef shared_ptr[CMapData] v = self.data.get().GetMap(i)
return MapData.wrap(v, data_type)
def __getitem__(self, i):
if not isinstance(i, int):
assert type(i) is str
i = self.schema.names.index(i)
if i > self.num_fields or i < 0:
raise IndexError("num_fields is {}, but index is {}"
.format(self.num_fields, i))
return self.get(i)
def __getattr__(self, item):
return self.__getitem__(item)
def get(self, i):
key = id(self.schema)
readers = reader_map.get(key)
if readers is None:
readers = []
for field_index in range(len(self.schema)):
readers.append(get_reader(
self.schema.field(field_index).type, type(self)))
reader_map[key] = readers
if self.is_null_at(i):
return None
else:
return readers[i](self, i)
def __dealloc__(self):
reader_map.pop(id(self.schema), None)
def __str__(self) -> str:
cdef:
Field field
int num_fields = len(self.schema)
int i
str result = "{"
for i in range(num_fields):
if i != 0:
result += ','
field = self.schema.field(i)
getter = get_reader(field.type, type(self))
result += field.name
result += '='
if self.is_null_at(i):
result += "null"
else:
result += str(getter(self, i))
result += "}"
return result
def assert_type(i, data_type, type_cls):
if not isinstance(data_type, type_cls):
raise TypeError("type for {0} is {1}, isn't {2}".
format(i, data_type, type_cls))
def get_reader(data_type, type_):
if types.is_boolean(data_type):
return type_.get_boolean
elif types.is_int8(data_type):
return type_.get_int8
elif types.is_int16(data_type):
return type_.get_int16
elif types.is_int32(data_type):
return type_.get_int32
elif types.is_int64(data_type):
return type_.get_int64
elif types.is_float32(data_type):
return type_.get_float
elif types.is_float64(data_type):
return type_.get_double
elif types.is_date32(data_type):
return type_.get_date
elif types.is_timestamp(data_type):
return type_.get_datetime
elif types.is_binary(data_type):
return type_.get_binary
elif types.is_string(data_type):
return type_.get_str
elif types.is_struct(data_type):
return type_.get_struct
elif types.is_list(data_type):
return type_.get_array_data
elif types.is_map(data_type):
return type_.get_map_data
raise TypeError("Unsupported type: " + str(data_type))