datahub/batch/binary_record.py (203 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from decimal import Decimal
from .utils import *
from .record_header import RecordHeader, RECORD_HEADER_SIZE
from ..models import FieldType
from ..utils.converters import to_binary, to_text
from ..exceptions import InvalidParameterException, DatahubException
BYTE_SIZE_ONE_FIELD = 8
FIELD_COUNT_BYTE_SIZE = 4
INT_BYTE_SIZE = 4
PADDING_BYTES = b'\x00'
class BinaryRecord:
def __init__(self, schema=None, version_id=None, buffer=None, header=None):
self._field_cnt = len(schema.field_list) if schema else 1
self._attr_length = 0
self._version_id = version_id
self._schema = schema
self._buffer = buffer if buffer else bytes()
self._attr_map = dict()
self._has_init_attr_map = False
self._null_bit = [0] * (((self._field_cnt + 63) >> 6) << 3) # N/8
self._fields = [bytes()] * self._field_cnt # N*8
self._field_pos = RECORD_HEADER_SIZE + FIELD_COUNT_BYTE_SIZE + len(self._null_bit)
self._next_pos = self._field_pos + self._field_cnt * BYTE_SIZE_ONE_FIELD
# RecordHeader
self._record_header = header
# =======================
# put record
# =======================
def serialize(self):
# record header
record_head_byte = RecordHeader.serialize(0, self._version_id, self.__get_record_size(), self._next_pos)
if len(record_head_byte) != RECORD_HEADER_SIZE:
raise DatahubException("Record header size is {}, should be {}".format(len(record_head_byte), RECORD_HEADER_SIZE))
self.__append(record_head_byte)
# field count
field_count_byte = int2byte(self._field_cnt)
self.__append(field_count_byte)
# NullBit: N/8
for null_bit in self._null_bit:
null_bit_byte = int2byte(null_bit, size=1, unsigned=True)
self.__append(null_bit_byte)
# Field: N*8
for field_data in self._fields:
self.__append(field_data)
# attribute map
attr_map_len_byte = int2byte(len(self._attr_map))
self.__append(attr_map_len_byte)
for key, val in self._attr_map.items():
key_len_byte = int2byte(len(key))
self.__append(key_len_byte)
key_byte = to_binary(key)
self.__append(key_byte)
val_len_byte = int2byte(len(val))
self.__append(val_len_byte)
val_byte = to_binary(val)
self.__append(val_byte)
return self._buffer
def set_field(self, pos, value):
self.__set_none(pos)
if self._schema is None: # Blob
if not isinstance(value, bytes):
raise InvalidParameterException("Only support write bytes for no schema")
self._fields[0] = self.__set_byte_field(value)
else: # Tuple
field = self._schema.get_field(pos)
field_type = field.type
if field_type is FieldType.STRING or field_type is FieldType.DECIMAL:
value_byte = self.__set_byte_field(to_binary(str(value)))
elif field_type is FieldType.BOOLEAN:
value_byte = bool2byte(value)
value_byte += PADDING_BYTES * 7
elif field_type is FieldType.FLOAT:
value_byte = float2byte(value)
value_byte += PADDING_BYTES * 4
elif field_type is FieldType.DOUBLE:
value_byte = double2byte(value)
elif field_type in (FieldType.TINYINT, FieldType.SMALLINT, FieldType.INTEGER,
FieldType.BIGINT, FieldType.TIMESTAMP):
value_byte = int2byte(value, size=8)
else:
raise DatahubException("Error field type. {}".format(field_type))
self._fields[pos] = value_byte
def add_attribute(self, key, value):
self._attr_map[key] = value
self._attr_length += (INT_BYTE_SIZE * 2 + len(key) + len(value))
self._has_init_attr_map = False
def __set_byte_field(self, value):
value_byte_len = len(value)
if value_byte_len <= 7:
value += (PADDING_BYTES * (7 - value_byte_len))
value += int2byte(value_byte_len | 0x80, size=1, unsigned=True)
return value
# offset from the end of RecordHeader
value_offset = FIELD_COUNT_BYTE_SIZE + len(self._null_bit) + self._field_cnt * 8
for i in range(self._field_cnt, len(self._fields)):
value_offset += len(self._fields[i])
# length(4 Byte) + offset(4 Byte)
value_byte = int2byte(value_byte_len, size=4) + int2byte(value_offset, size=4)
# add padding to 8N Byte
value += (PADDING_BYTES * (BYTE_SIZE_ONE_FIELD - (len(value) % BYTE_SIZE_ONE_FIELD)))
self._fields.append(value)
self._next_pos += len(value)
return value_byte
def __append(self, src_buffer):
self._buffer += src_buffer
def __set_none(self, pos, none=False):
if not none:
self.__check_pos_valid(pos)
index = pos >> 3
self._null_bit[index] |= (1 << (pos & 0x07))
# =======================
# get record
# =======================
@classmethod
def deserialize(cls, schema, buffer, record_header):
binary_record = cls(schema, record_header.schema_version, buffer, record_header)
# Deserialize null_bit
null_bit_pos = RECORD_HEADER_SIZE + FIELD_COUNT_BYTE_SIZE
for index in range(len(binary_record._null_bit)):
binary_record._null_bit[index] = byte2int(binary_record._buffer[null_bit_pos:null_bit_pos + 1], size=1)
null_bit_pos += 1
# Deserialize filed
for i in range(binary_record._field_cnt):
binary_record._fields[i] = binary_record.__get_field(i)
# Deserialize attribute map
binary_record.__init_attr_map_if_need()
return binary_record
def get_field(self, pos):
self.__check_pos_valid(pos)
return self._fields[pos]
def get_attribute(self):
return self._attr_map
def __get_field(self, pos):
if self.__is_field_none(pos):
return None
field_byte = self.__read_field(pos)
if self._schema is None: # BLOB
return self.__get_byte_field(field_byte)
else: # TUPLE
field_type = self._schema.get_field(pos).type
value = None
# length = 1
if field_type is FieldType.BOOLEAN:
value = byte2bool(field_byte[:1])
elif field_type is FieldType.TINYINT:
value = byte2int(field_byte[:1], size=1)
# length = 2
elif field_type is FieldType.SMALLINT:
value = byte2int(field_byte[:2], size=2)
# length = 4
elif field_type is FieldType.INTEGER:
value = byte2int(field_byte[:4], size=4)
elif field_type is FieldType.FLOAT:
value = byte2float(field_byte[:4])
# length = 8
elif field_type is FieldType.BIGINT or field_type is FieldType.TIMESTAMP:
value = byte2int(field_byte, size=8)
elif field_type is FieldType.DOUBLE:
value = byte2double(field_byte)
# length = ?
elif field_type is FieldType.STRING or field_type is FieldType.DECIMAL:
value = self.__get_str_field(field_byte)
if field_type is FieldType.DECIMAL:
value = Decimal(value)
return value
def __get_byte_field(self, value_byte):
data = byte2int(value_byte, size=8)
is_little_str = (data & (0x80 << 56)) != 0
if is_little_str:
str_size = ((data >> 56) & 0x07)
value = value_byte[:str_size]
else:
str_offset = RECORD_HEADER_SIZE + (data >> 32)
str_size = byte2int(value_byte[:4], size=4)
value = self._buffer[str_offset:str_offset + str_size]
self._next_pos -= str_size
return value
def __get_str_field(self, value_byte):
return to_text(self.__get_byte_field(value_byte))
def __is_field_none(self, pos):
self.__check_pos_valid(pos)
index = pos >> 3
value = self._null_bit[index] & (1 << (pos & 0x07))
return value == 0
def __init_attr_map_if_need(self):
if self._has_init_attr_map:
return
offset = self._record_header.attr_offset
attr_size = byte2int(self._buffer[offset:offset+4])
if attr_size != 0 and self._attr_map is None:
self._attr_map = dict()
offset += INT_BYTE_SIZE
for i in range(attr_size):
key_size = byte2int(self._buffer[offset:offset+4])
offset += INT_BYTE_SIZE
key_str = to_text(self._buffer[offset:offset+key_size])
offset += key_size
val_size = byte2int(self._buffer[offset:offset+4])
offset += INT_BYTE_SIZE
value_str = to_text(self._buffer[offset:offset+val_size])
offset += val_size
self._attr_map[key_str] = value_str
self._attr_length += (key_size + val_size + 2*INT_BYTE_SIZE)
self._has_init_attr_map = True
def __get_field_offset(self, pos):
return self._field_pos + pos * BYTE_SIZE_ONE_FIELD
def __read_field(self, pos):
offset = self.__get_field_offset(pos)
return self._buffer[offset:offset + BYTE_SIZE_ONE_FIELD]
# =======================
# common
# =======================
def __get_record_size(self):
return INT_BYTE_SIZE + self._attr_length + self._next_pos
def __check_pos_valid(self, pos):
if pos < 0 or pos >= self._field_cnt:
raise InvalidParameterException("Invalid position. position: {}, fieldCount: {}".format(pos, self._field_cnt))
@property
def field_cnt(self):
return self._field_cnt
@property
def version_id(self):
return self._version_id
@property
def schema(self):
return self._schema
@schema.setter
def schema(self, schema):
self._schema = schema