datahub/batch/batch_serializer.py (86 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import crcmod.predefined
from .binary_record import BinaryRecord
from .batch_binary_record import BatchBinaryRecord
from .record_header import RECORD_HEADER_SIZE, RecordHeader
from .batch_header import BATCH_HEAD_SIZE, BatchHeader
from ..models.compress import *
from ..models import BlobRecord, TupleRecord, RecordSchema
class BatchSerializer:
"""
Batch serializer
"""
@staticmethod
def serialize(compress_type, schema_object, record_list):
batch = BatchBinaryRecord()
# TupleRecord/BlobRecord to BinaryRecord
binary_records = [BatchSerializer.convert_to_binary_record(record, schema_object) for record in record_list]
for record in binary_records:
batch.add_record(record)
return batch.serialize(compress_type)
@staticmethod
def deserialize(init_schema, schema_object, byte_data):
# bytes --> BatchBinaryRecord
batch_records = BatchSerializer.convert_byte_to_batch_record(init_schema, schema_object, byte_data)
# BatchBinaryRecord --> list of TupleRecord/BlobRecord
record_list = [BatchSerializer.convert_to_record(record, init_schema) for record in batch_records.records]
return record_list
# =======================
# serialize
# =======================
@staticmethod
def convert_to_binary_record(record, schema_object):
if isinstance(record, BlobRecord):
binary_record = BinaryRecord(schema=None, version_id=-1)
binary_record.set_field(0, record.blob_data)
else:
schema = RecordSchema(record.field_list)
version_id = 0
if schema_object.schema_register:
version_id_new = int(schema_object) if isinstance(schema_object, str) else schema_object.schema_register.get_version_id(schema_object.project, schema_object.topic, schema)
if version_id_new:
version_id = version_id_new
binary_record = BinaryRecord(schema=schema, version_id=version_id)
for i in range(len(schema.field_list)):
value = record.get_value(i)
binary_record.set_field(i, value)
if record.attributes:
for key, val in record.attributes.items():
binary_record.add_attribute(key, val)
return binary_record
# =======================
# deserialize
# =======================
# BinaryRecord --> TupleRecord/BlobRecord
@staticmethod
def convert_to_record(binary_record, init_schema):
if init_schema is None: # BLOB
# set blob data
blob_data = binary_record.get_field(0)
record = BlobRecord(blob_data=blob_data)
else: # TUPLE
# set tuple data
record = TupleRecord(field_list=None, schema=binary_record.schema, values=None)
for i in range(binary_record.field_cnt):
record.set_value(i, value=binary_record.get_field(i))
# set attribute
attr_map = binary_record.get_attribute()
for key, val in attr_map.items():
record.put_attribute(key, val)
return record
@staticmethod
def convert_byte_to_batch_record(init_schema, schema_object, byte_data):
# Deserialize the batch header
batch_header_byte = byte_data[:BATCH_HEAD_SIZE]
batch_header = BatchHeader.deserialize(batch_header_byte)
# Check crc
crc32c = crcmod.predefined.mkCrcFun('crc-32c')
compute_crc32 = crc32c(byte_data[BATCH_HEAD_SIZE:]) & 0xffffffff
if batch_header.crc32 != compute_crc32:
raise DatahubException("Check crc fail. expect: {}, real: {}".format(batch_header.crc32, compute_crc32))
# Check length
if batch_header.length != len(byte_data):
raise DatahubException(
"Check batch header length fail. expect: {}, real: {}".format(batch_header.length, len(byte_data)))
# Decompress
compress_type = CompressFormat.get_compress(batch_header.attributes & 0x03)
all_binary_buffer = byte_data[BATCH_HEAD_SIZE:]
data_decompressor = get_compressor(compress_type)
all_binary_buffer = data_decompressor.decompress(all_binary_buffer, batch_header.raw_size)
# deserialize to list of BinaryRecord
batch_records = BatchBinaryRecord()
next_pos = 0
for index in range(batch_header.record_count):
# deserializer record header first
record_header = RecordHeader.deserialize(all_binary_buffer[next_pos: next_pos + RECORD_HEADER_SIZE])
total_size = record_header.total_size
binary_record = BatchSerializer.convert_byte_to_binary_record(init_schema, schema_object, all_binary_buffer[next_pos:next_pos + total_size], record_header)
next_pos += total_size
batch_records.add_record(binary_record)
return batch_records
@staticmethod
def convert_byte_to_binary_record(init_schema, schema_object, binary_records_buffer, record_header):
schema = init_schema
if schema_object.schema_register:
schema_new = RecordSchema.from_json_str(schema_object) if isinstance(schema_object, str) else schema_object.schema_register.get_schema(schema_object.project, schema_object.topic, record_header.schema_version)
if schema_new:
schema = schema_new
record = BinaryRecord.deserialize(schema, binary_records_buffer, record_header)
return record