python/pyfury/format/encoder.py (35 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import pyfury import pyfury.type class Encoder: def __init__(self, clz=None, schema=None): """ A pojo class whose schema can be inferred by `pyfury.infer_schema` """ self.clz = clz self.schema = schema or pyfury.format.infer.infer_schema(clz) self.row_encoder = pyfury.create_row_encoder(self.schema) self.schema_hash: bytes = pyfury.format.infer.compute_schema_hash(self.schema) def encode(self, obj): row = self.row_encoder.to_row(obj) buffer = pyfury.Buffer.allocate(8 + row.size_bytes()) buffer.write_int64(self.schema_hash) row_bytes = row.to_bytes() buffer.write_bytes(row_bytes) return buffer.to_bytes(0, buffer.writer_index) def decode(self, binary: bytes): buf = pyfury.Buffer(binary, 0, len(binary)) peer_hash = buf.read_int64() assert self.schema_hash == peer_hash, ( f"Schema is not consistent, encoder schema is {self.schema}, " f"clz is {self.clz}. Self/peer schema hash is " f"{self.schema_hash, peer_hash}. " f"Please check writer schema." ) buf = pyfury.Buffer(binary, 8, len(binary) - 8) row = pyfury.RowData(self.schema, buf) return self.row_encoder.from_row(row) def to_row(self, obj): return self.row_encoder.to_row(obj) def from_row(self, binary: bytes): buf = pyfury.Buffer(binary, 0, len(binary)) row = pyfury.RowData(self.schema, buf) return self.row_encoder.from_row(row) def encoder(clz=None, schema=None): """A pojo class whose schema can be inferred by `pyfury.infer_schema`""" return Encoder(clz=clz, schema=schema)