avro/src/headers.rs (112 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //! Handling of Avro magic headers use uuid::Uuid; use crate::{rabin::Rabin, schema::SchemaFingerprint, AvroResult, Schema}; /// This trait represents that an object is able to construct an Avro message header. It is /// implemented for some known header types already. If you need a header type that is not already /// included here, then you can create your own struct and implement this trait. pub trait HeaderBuilder { fn build_header(&self) -> Vec<u8>; } /// HeaderBuilder based on the Rabin schema fingerprint /// /// This is the default and will be used automatically by the `new` impls in /// [crate::reader::GenericSingleObjectReader] and [crate::writer::GenericSingleObjectWriter]. pub struct RabinFingerprintHeader { fingerprint: SchemaFingerprint, } impl RabinFingerprintHeader { /// Use this helper to build an instance from an existing Avro `Schema`. pub fn from_schema(schema: &Schema) -> Self { let fingerprint = schema.fingerprint::<Rabin>(); RabinFingerprintHeader { fingerprint } } } impl HeaderBuilder for RabinFingerprintHeader { fn build_header(&self) -> Vec<u8> { let bytes = &self.fingerprint.bytes; vec![ 0xC3, 0x01, bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], ] } } /// HeaderBuilder based on /// [Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) schema UUID /// /// See the function docs for usage details pub struct GlueSchemaUuidHeader { schema_uuid: Uuid, } impl GlueSchemaUuidHeader { /// Create an instance of the struct from a Glue Schema UUID /// /// Code for writing messages will most likely want to use this. You will need to determine /// via other means the correct Glue schema UUID and use it with this method to be able to /// create Avro-encoded messages with the correct headers. pub fn from_uuid(schema_uuid: Uuid) -> Self { GlueSchemaUuidHeader { schema_uuid } } /// The minimum length of a Glue header. /// 2 bytes for the special prefix (3, 0) plus /// 16 bytes for the Uuid const GLUE_HEADER_LENGTH: usize = 18; /// Create an instance of the struct based on parsing the UUID out of the header of a raw /// message /// /// Code for reading messages will most likely want to use this. Once you receive the raw bytes /// of a message, use this function to build the struct from it. That struct can then be used /// with the below `schema_uuid` function to retrieve the UUID in order to retrieve the correct /// schema for the message. You can then use the raw message, the schema, and the struct /// instance to read the message. pub fn parse_from_raw_avro(message_payload: &[u8]) -> AvroResult<Self> { if message_payload.len() < Self::GLUE_HEADER_LENGTH { return Err(crate::error::Error::HeaderMagic); } let schema_uuid = Uuid::from_slice(&message_payload[2..18]).map_err(crate::Error::UuidFromSlice)?; Ok(GlueSchemaUuidHeader { schema_uuid }) } /// Retrieve the UUID from the object /// /// This is most useful in conjunction with the `parse_from_raw_avro` function to retrieve the /// actual UUID from the raw data of a received message. pub fn schema_uuid(&self) -> Uuid { self.schema_uuid } } impl HeaderBuilder for GlueSchemaUuidHeader { fn build_header(&self) -> Vec<u8> { let mut output_vec: Vec<u8> = vec![3, 0]; output_vec.extend_from_slice(self.schema_uuid.as_bytes()); output_vec } } #[cfg(test)] mod test { use super::*; use apache_avro_test_helper::TestResult; #[test] fn test_rabin_fingerprint_header() -> TestResult { let schema_str = r#" { "type": "record", "name": "test", "fields": [ { "name": "a", "type": "long", "default": 42 }, { "name": "b", "type": "string" } ] } "#; let schema = Schema::parse_str(schema_str)?; let header_builder = RabinFingerprintHeader::from_schema(&schema); let computed_header = header_builder.build_header(); let expected_header: Vec<u8> = vec![195, 1, 232, 198, 194, 12, 97, 95, 44, 71]; assert_eq!(computed_header, expected_header); Ok(()) } #[test] fn test_glue_schema_header() -> TestResult { let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?; let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid); let computed_header = header_builder.build_header(); let expected_header: Vec<u8> = vec![ 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95, ]; assert_eq!(computed_header, expected_header); Ok(()) } #[test] fn test_glue_header_parse() -> TestResult { let incoming_avro_message: Vec<u8> = vec![ 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95, 65, 65, 65, ]; let header_builder = GlueSchemaUuidHeader::parse_from_raw_avro(&incoming_avro_message)?; let expected_schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?; assert_eq!(header_builder.schema_uuid(), expected_schema_uuid); Ok(()) } #[test] fn test_glue_header_parse_err_on_message_too_short() -> TestResult { let incoming_message: Vec<u8> = vec![3, 0, 178, 241, 207, 0, 4, 52, 1]; let header_builder_res = GlueSchemaUuidHeader::parse_from_raw_avro(&incoming_message); assert!(matches!( header_builder_res, Err(crate::error::Error::HeaderMagic) )); Ok(()) } }