def convert_byte_to_batch_record()

in datahub/batch/batch_serializer.py [0:0]


    def convert_byte_to_batch_record(init_schema, schema_object, byte_data):
        # Deserialize the batch header
        batch_header_byte = byte_data[:BATCH_HEAD_SIZE]
        batch_header = BatchHeader.deserialize(batch_header_byte)

        # Check crc
        crc32c = crcmod.predefined.mkCrcFun('crc-32c')
        compute_crc32 = crc32c(byte_data[BATCH_HEAD_SIZE:]) & 0xffffffff
        if batch_header.crc32 != compute_crc32:
            raise DatahubException("Check crc fail. expect: {}, real: {}".format(batch_header.crc32, compute_crc32))

        # Check length
        if batch_header.length != len(byte_data):
            raise DatahubException(
                "Check batch header length fail. expect: {}, real: {}".format(batch_header.length, len(byte_data)))

        # Decompress
        compress_type = CompressFormat.get_compress(batch_header.attributes & 0x03)
        all_binary_buffer = byte_data[BATCH_HEAD_SIZE:]
        data_decompressor = get_compressor(compress_type)
        all_binary_buffer = data_decompressor.decompress(all_binary_buffer, batch_header.raw_size)

        # deserialize to list of BinaryRecord
        batch_records = BatchBinaryRecord()
        next_pos = 0
        for index in range(batch_header.record_count):
            # deserializer record header first
            record_header = RecordHeader.deserialize(all_binary_buffer[next_pos: next_pos + RECORD_HEADER_SIZE])
            total_size = record_header.total_size

            binary_record = BatchSerializer.convert_byte_to_binary_record(init_schema, schema_object, all_binary_buffer[next_pos:next_pos + total_size], record_header)
            next_pos += total_size
            batch_records.add_record(binary_record)
        return batch_records