datahub/batch/batch_binary_record.py (65 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import crcmod.predefined
from .binary_record import BinaryRecord
from .batch_header import BatchHeader, BATCH_HEAD_SIZE
from ..models.compress import *
from ..exceptions import DatahubException, InvalidParameterException
class BatchBinaryRecord:
"""
Batch binary record
"""
def __init__(self, records=None):
self._version = None
self._length = None
self._raw_size = None
self._crc32 = None
self._attributes = None
self._record_count = None
self._records = records if records else [] # list of BinaryRecord
self._buffer = bytes()
def add_record(self, record):
if not record or not isinstance(record, BinaryRecord):
raise InvalidParameterException("Add record fail. record must be a valid BinaryRecord instance")
self._records.append(record)
def serialize(self, compress_type=None):
try:
# Add BinaryRecord list
for record in self._records:
record_byte = record.serialize()
self._buffer += record_byte
# compress
self.__compress(compress_type)
crc32c = crcmod.predefined.mkCrcFun('crc-32c')
self._crc32 = crc32c(self._buffer) & 0xffffffff
self._version = 0
self._record_count = len(self._records)
# Add Batch header
header_byte = BatchHeader.serialize(
self._version,
self._length,
self._raw_size,
self._crc32,
self._attributes,
self._record_count
)
if len(header_byte) != BATCH_HEAD_SIZE:
raise DatahubException("Batch header size should be {}, it is {}".format(BATCH_HEAD_SIZE, len(header_byte)))
return header_byte + self._buffer
except Exception as e:
raise DatahubException("Serialize batch record fail. {}".format(e))
def __compress(self, compress_type=None):
self._raw_size = len(self._buffer)
self._length = self._raw_size + BATCH_HEAD_SIZE
try:
data_compressor = get_compressor(compress_type)
compress_data = data_compressor.compress(self._buffer)
if len(compress_data) < self._raw_size:
self._attributes = compress_type.get_index() | 8
self._buffer = compress_data
self._length = BATCH_HEAD_SIZE + len(compress_data)
else:
self._attributes = CompressFormat.NONE.get_index() | 8
except Exception as e:
raise DatahubException("Compress data fail. {}".format(e))
@property
def records(self):
return self._records
@property
def buffer(self):
return self._buffer
@buffer.setter
def buffer(self, buffer):
self._buffer = buffer