in datahub/batch/batch_serializer.py [0:0]
def convert_byte_to_batch_record(init_schema, schema_object, byte_data):
# Deserialize the batch header
batch_header_byte = byte_data[:BATCH_HEAD_SIZE]
batch_header = BatchHeader.deserialize(batch_header_byte)
# Check crc
crc32c = crcmod.predefined.mkCrcFun('crc-32c')
compute_crc32 = crc32c(byte_data[BATCH_HEAD_SIZE:]) & 0xffffffff
if batch_header.crc32 != compute_crc32:
raise DatahubException("Check crc fail. expect: {}, real: {}".format(batch_header.crc32, compute_crc32))
# Check length
if batch_header.length != len(byte_data):
raise DatahubException(
"Check batch header length fail. expect: {}, real: {}".format(batch_header.length, len(byte_data)))
# Decompress
compress_type = CompressFormat.get_compress(batch_header.attributes & 0x03)
all_binary_buffer = byte_data[BATCH_HEAD_SIZE:]
data_decompressor = get_compressor(compress_type)
all_binary_buffer = data_decompressor.decompress(all_binary_buffer, batch_header.raw_size)
# deserialize to list of BinaryRecord
batch_records = BatchBinaryRecord()
next_pos = 0
for index in range(batch_header.record_count):
# deserializer record header first
record_header = RecordHeader.deserialize(all_binary_buffer[next_pos: next_pos + RECORD_HEADER_SIZE])
total_size = record_header.total_size
binary_record = BatchSerializer.convert_byte_to_binary_record(init_schema, schema_object, all_binary_buffer[next_pos:next_pos + total_size], record_header)
next_pos += total_size
batch_records.add_record(binary_record)
return batch_records