avro/src/ser_schema.rs (2,304 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //! Logic for serde-compatible schema-aware serialization //! which writes directly to a `Write` stream use crate::{ bigdecimal::big_decimal_as_bytes, encode::{encode_int, encode_long}, error::Error, schema::{Name, NamesRef, Namespace, RecordSchema, Schema}, }; use bigdecimal::BigDecimal; use serde::ser; use std::{borrow::Cow, io::Write, str::FromStr}; const RECORD_FIELD_INIT_BUFFER_SIZE: usize = 64; const COLLECTION_SERIALIZER_ITEM_LIMIT: usize = 1024; const COLLECTION_SERIALIZER_DEFAULT_INIT_ITEM_CAPACITY: usize = 32; const SINGLE_VALUE_INIT_BUFFER_SIZE: usize = 128; /// The sequence serializer for [`SchemaAwareWriteSerializer`]. /// [`SchemaAwareWriteSerializeSeq`] may break large arrays up into multiple blocks to avoid having /// to obtain the length of the entire array before being able to write any data to the underlying /// [`std::fmt::Write`] stream. (See the /// [Data Serialization and Deserialization](https://avro.apache.org/docs/1.12.0/specification/#data-serialization-and-deserialization) /// section of the Avro spec for more info.) pub struct SchemaAwareWriteSerializeSeq<'a, 's, W: Write> { ser: &'a mut SchemaAwareWriteSerializer<'s, W>, item_schema: &'s Schema, item_buffer_size: usize, item_buffers: Vec<Vec<u8>>, bytes_written: usize, } impl<'a, 's, W: Write> SchemaAwareWriteSerializeSeq<'a, 's, W> { fn new( ser: &'a mut SchemaAwareWriteSerializer<'s, W>, item_schema: &'s Schema, len: Option<usize>, ) -> SchemaAwareWriteSerializeSeq<'a, 's, W> { SchemaAwareWriteSerializeSeq { ser, item_schema, item_buffer_size: SINGLE_VALUE_INIT_BUFFER_SIZE, item_buffers: Vec::with_capacity( len.unwrap_or(COLLECTION_SERIALIZER_DEFAULT_INIT_ITEM_CAPACITY), ), bytes_written: 0, } } fn write_buffered_items(&mut self) -> Result<(), Error> { if !self.item_buffers.is_empty() { self.bytes_written += encode_long(self.item_buffers.len() as i64, &mut self.ser.writer)?; for item in self.item_buffers.drain(..) { self.bytes_written += self .ser .writer .write(item.as_slice()) .map_err(Error::WriteBytes)?; } } Ok(()) } fn serialize_element<T: ser::Serialize>(&mut self, value: &T) -> Result<(), Error> { let mut item_buffer: Vec<u8> = Vec::with_capacity(self.item_buffer_size); let mut item_ser = SchemaAwareWriteSerializer::new( &mut item_buffer, self.item_schema, self.ser.names, self.ser.enclosing_namespace.clone(), ); value.serialize(&mut item_ser)?; self.item_buffer_size = std::cmp::max(self.item_buffer_size, item_buffer.len() + 16); self.item_buffers.push(item_buffer); if self.item_buffers.len() > COLLECTION_SERIALIZER_ITEM_LIMIT { self.write_buffered_items()?; } Ok(()) } fn end(mut self) -> Result<usize, Error> { self.write_buffered_items()?; self.bytes_written += self.ser.writer.write(&[0u8]).map_err(Error::WriteBytes)?; Ok(self.bytes_written) } } impl<W: Write> ser::SerializeSeq for SchemaAwareWriteSerializeSeq<'_, '_, W> { type Ok = usize; type Error = Error; fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error> where T: ?Sized + ser::Serialize, { self.serialize_element(&value) } fn end(self) -> Result<Self::Ok, Self::Error> { self.end() } } impl<W: Write> ser::SerializeTuple for SchemaAwareWriteSerializeSeq<'_, '_, W> { type Ok = usize; type Error = Error; fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error> where T: ?Sized + ser::Serialize, { ser::SerializeSeq::serialize_element(self, value) } fn end(self) -> Result<Self::Ok, Self::Error> { ser::SerializeSeq::end(self) } } /// The map serializer for [`SchemaAwareWriteSerializer`]. /// [`SchemaAwareWriteSerializeMap`] may break large maps up into multiple blocks to avoid having to /// obtain the size of the entire map before being able to write any data to the underlying /// [`std::fmt::Write`] stream. (See the /// [Data Serialization and Deserialization](https://avro.apache.org/docs/1.12.0/specification/#data-serialization-and-deserialization) /// section of the Avro spec for more info.) pub struct SchemaAwareWriteSerializeMap<'a, 's, W: Write> { ser: &'a mut SchemaAwareWriteSerializer<'s, W>, item_schema: &'s Schema, item_buffer_size: usize, item_buffers: Vec<Vec<u8>>, bytes_written: usize, } impl<'a, 's, W: Write> SchemaAwareWriteSerializeMap<'a, 's, W> { fn new( ser: &'a mut SchemaAwareWriteSerializer<'s, W>, item_schema: &'s Schema, len: Option<usize>, ) -> SchemaAwareWriteSerializeMap<'a, 's, W> { SchemaAwareWriteSerializeMap { ser, item_schema, item_buffer_size: SINGLE_VALUE_INIT_BUFFER_SIZE, item_buffers: Vec::with_capacity( len.unwrap_or(COLLECTION_SERIALIZER_DEFAULT_INIT_ITEM_CAPACITY), ), bytes_written: 0, } } fn write_buffered_items(&mut self) -> Result<(), Error> { if !self.item_buffers.is_empty() { self.bytes_written += encode_long(self.item_buffers.len() as i64, &mut self.ser.writer)?; for item in self.item_buffers.drain(..) { self.bytes_written += self .ser .writer .write(item.as_slice()) .map_err(Error::WriteBytes)?; } } Ok(()) } } impl<W: Write> ser::SerializeMap for SchemaAwareWriteSerializeMap<'_, '_, W> { type Ok = usize; type Error = Error; fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error> where T: ?Sized + ser::Serialize, { let mut element_buffer: Vec<u8> = Vec::with_capacity(self.item_buffer_size); let string_schema = Schema::String; let mut key_ser = SchemaAwareWriteSerializer::new( &mut element_buffer, &string_schema, self.ser.names, self.ser.enclosing_namespace.clone(), ); key.serialize(&mut key_ser)?; self.item_buffers.push(element_buffer); Ok(()) } fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error> where T: ?Sized + ser::Serialize, { let last_index = self.item_buffers.len() - 1; let element_buffer = &mut self.item_buffers[last_index]; let mut val_ser = SchemaAwareWriteSerializer::new( element_buffer, self.item_schema, self.ser.names, self.ser.enclosing_namespace.clone(), ); value.serialize(&mut val_ser)?; self.item_buffer_size = std::cmp::max(self.item_buffer_size, element_buffer.len() + 16); if self.item_buffers.len() > COLLECTION_SERIALIZER_ITEM_LIMIT { self.write_buffered_items()?; } Ok(()) } fn end(mut self) -> Result<Self::Ok, Self::Error> { self.write_buffered_items()?; self.bytes_written += self.ser.writer.write(&[0u8]).map_err(Error::WriteBytes)?; Ok(self.bytes_written) } } /// The struct serializer for [`SchemaAwareWriteSerializer`], which can serialize Avro records. /// [`SchemaAwareWriteSerializeStruct`] can accept fields out of order, but doing so incurs a /// performance penalty, since it requires [`SchemaAwareWriteSerializeStruct`] to buffer serialized /// values in order to write them to the stream in order. pub struct SchemaAwareWriteSerializeStruct<'a, 's, W: Write> { ser: &'a mut SchemaAwareWriteSerializer<'s, W>, record_schema: &'s RecordSchema, item_count: usize, buffered_fields: Vec<Option<Vec<u8>>>, bytes_written: usize, } impl<'a, 's, W: Write> SchemaAwareWriteSerializeStruct<'a, 's, W> { fn new( ser: &'a mut SchemaAwareWriteSerializer<'s, W>, record_schema: &'s RecordSchema, len: usize, ) -> SchemaAwareWriteSerializeStruct<'a, 's, W> { SchemaAwareWriteSerializeStruct { ser, record_schema, item_count: 0, buffered_fields: vec![None; len], bytes_written: 0, } } fn serialize_next_field<T>(&mut self, value: &T) -> Result<(), Error> where T: ?Sized + ser::Serialize, { let next_field = self.record_schema.fields.get(self.item_count).expect( "Validity of the next field index was expected to have been checked by the caller", ); // If we receive fields in order, write them directly to the main writer let mut value_ser = SchemaAwareWriteSerializer::new( &mut *self.ser.writer, &next_field.schema, self.ser.names, self.ser.enclosing_namespace.clone(), ); self.bytes_written += value.serialize(&mut value_ser)?; self.item_count += 1; // Write any buffered data to the stream that has now become next in line while let Some(buffer) = self .buffered_fields .get_mut(self.item_count) .and_then(|b| b.take()) { self.bytes_written += self .ser .writer .write(buffer.as_slice()) .map_err(Error::WriteBytes)?; self.item_count += 1; } Ok(()) } fn end(self) -> Result<usize, Error> { if self.item_count != self.record_schema.fields.len() { Err(Error::GetField( self.record_schema.fields[self.item_count].name.clone(), )) } else { Ok(self.bytes_written) } } } impl<W: Write> ser::SerializeStruct for SchemaAwareWriteSerializeStruct<'_, '_, W> { type Ok = usize; type Error = Error; fn serialize_field<T>(&mut self, key: &'static str, value: &T) -> Result<(), Self::Error> where T: ?Sized + ser::Serialize, { if self.item_count >= self.record_schema.fields.len() { return Err(Error::FieldName(String::from(key))); } let next_field = &self.record_schema.fields[self.item_count]; let next_field_matches = match &next_field.aliases { Some(aliases) => { key == next_field.name.as_str() || aliases.iter().any(|a| key == a.as_str()) } None => key == next_field.name.as_str(), }; if next_field_matches { self.serialize_next_field(&value).map_err(|e| { Error::SerializeRecordFieldWithSchema { field_name: key, record_schema: Schema::Record(self.record_schema.clone()), error: Box::new(e), } })?; Ok(()) } else { if self.item_count < self.record_schema.fields.len() { for i in self.item_count..self.record_schema.fields.len() { let field = &self.record_schema.fields[i]; let field_matches = match &field.aliases { Some(aliases) => { key == field.name.as_str() || aliases.iter().any(|a| key == a.as_str()) } None => key == field.name.as_str(), }; if field_matches { let mut buffer: Vec<u8> = Vec::with_capacity(RECORD_FIELD_INIT_BUFFER_SIZE); let mut value_ser = SchemaAwareWriteSerializer::new( &mut buffer, &field.schema, self.ser.names, self.ser.enclosing_namespace.clone(), ); value.serialize(&mut value_ser).map_err(|e| { Error::SerializeRecordFieldWithSchema { field_name: key, record_schema: Schema::Record(self.record_schema.clone()), error: Box::new(e), } })?; self.buffered_fields[i] = Some(buffer); return Ok(()); } } } Err(Error::FieldName(String::from(key))) } } fn end(self) -> Result<Self::Ok, Self::Error> { self.end() } } impl<W: Write> ser::SerializeStructVariant for SchemaAwareWriteSerializeStruct<'_, '_, W> { type Ok = usize; type Error = Error; fn serialize_field<T>(&mut self, key: &'static str, value: &T) -> Result<(), Self::Error> where T: ?Sized + ser::Serialize, { ser::SerializeStruct::serialize_field(self, key, value) } fn end(self) -> Result<Self::Ok, Self::Error> { ser::SerializeStruct::end(self) } } /// The tuple struct serializer for [`SchemaAwareWriteSerializer`]. /// [`SchemaAwareWriteSerializeTupleStruct`] can serialize to an Avro array, record, or big-decimal. /// When serializing to a record, fields must be provided in the correct order, since no names are provided. pub enum SchemaAwareWriteSerializeTupleStruct<'a, 's, W: Write> { Record(SchemaAwareWriteSerializeStruct<'a, 's, W>), Array(SchemaAwareWriteSerializeSeq<'a, 's, W>), } impl<W: Write> SchemaAwareWriteSerializeTupleStruct<'_, '_, W> { fn serialize_field<T>(&mut self, value: &T) -> Result<(), Error> where T: ?Sized + ser::Serialize, { use SchemaAwareWriteSerializeTupleStruct::*; match self { Record(record_ser) => record_ser.serialize_next_field(&value), Array(array_ser) => array_ser.serialize_element(&value), } } fn end(self) -> Result<usize, Error> { use SchemaAwareWriteSerializeTupleStruct::*; match self { Record(record_ser) => record_ser.end(), Array(array_ser) => array_ser.end(), } } } impl<W: Write> ser::SerializeTupleStruct for SchemaAwareWriteSerializeTupleStruct<'_, '_, W> { type Ok = usize; type Error = Error; fn serialize_field<T>(&mut self, value: &T) -> Result<(), Self::Error> where T: ?Sized + ser::Serialize, { self.serialize_field(&value) } fn end(self) -> Result<Self::Ok, Self::Error> { self.end() } } impl<W: Write> ser::SerializeTupleVariant for SchemaAwareWriteSerializeTupleStruct<'_, '_, W> { type Ok = usize; type Error = Error; fn serialize_field<T>(&mut self, value: &T) -> Result<(), Self::Error> where T: ?Sized + ser::Serialize, { self.serialize_field(&value) } fn end(self) -> Result<Self::Ok, Self::Error> { self.end() } } /// A [`serde::ser::Serializer`] implementation that serializes directly to a [`std::fmt::Write`] /// using the provided schema. If [`SchemaAwareWriteSerializer`] isn't able to match the incoming /// data with its schema, it will return an error. /// A [`SchemaAwareWriteSerializer`] instance can be re-used to serialize multiple values matching /// the schema to its [`std::fmt::Write`] stream. pub struct SchemaAwareWriteSerializer<'s, W: Write> { writer: &'s mut W, root_schema: &'s Schema, names: &'s NamesRef<'s>, enclosing_namespace: Namespace, } impl<'s, W: Write> SchemaAwareWriteSerializer<'s, W> { /// Create a new [`SchemaAwareWriteSerializer`]. /// /// `writer` is the [`std::fmt::Write`] stream to be written to. /// /// `schema` is the schema of the value to be written. /// /// `names` is the mapping of schema names to schemas, to be used for type reference lookups /// /// `enclosing_namespace` is the enclosing namespace to be used for type reference lookups pub fn new( writer: &'s mut W, schema: &'s Schema, names: &'s NamesRef<'s>, enclosing_namespace: Namespace, ) -> SchemaAwareWriteSerializer<'s, W> { SchemaAwareWriteSerializer { writer, root_schema: schema, names, enclosing_namespace, } } fn get_ref_schema(&self, name: &'s Name) -> Result<&'s Schema, Error> { let full_name = match name.namespace { Some(_) => Cow::Borrowed(name), None => Cow::Owned(Name { name: name.name.clone(), namespace: self.enclosing_namespace.clone(), }), }; let ref_schema = self.names.get(full_name.as_ref()).copied(); ref_schema.ok_or_else(|| Error::SchemaResolutionError(full_name.as_ref().clone())) } fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize, Error> { let mut bytes_written: usize = 0; bytes_written += encode_long(bytes.len() as i64, &mut self.writer)?; bytes_written += self.writer.write(bytes).map_err(Error::WriteBytes)?; Ok(bytes_written) } fn serialize_bool_with_schema(&mut self, value: bool, schema: &Schema) -> Result<usize, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "bool", value: format!("{value}. Cause: {cause}"), schema: schema.clone(), }; match schema { Schema::Boolean => self .writer .write(&[u8::from(value)]) .map_err(Error::WriteBytes), Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::Boolean => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_bool_with_schema(value, variant_schema); } _ => { /* skip */ } } } Err(create_error(format!( "No matching Schema::Bool found in {:?}", union_schema.schemas ))) } expected => Err(create_error(format!("Expected {expected}. Got: Bool"))), } } fn serialize_i32_with_schema(&mut self, value: i32, schema: &Schema) -> Result<usize, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "int (i8 | i16 | i32)", value: format!("{value}. Cause: {cause}"), schema: schema.clone(), }; match schema { Schema::Int | Schema::TimeMillis | Schema::Date => encode_int(value, &mut self.writer), Schema::Long | Schema::TimeMicros | Schema::TimestampMillis | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos => encode_long(value as i64, &mut self.writer), Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::Int | Schema::TimeMillis | Schema::Date | Schema::Long | Schema::TimeMicros | Schema::TimestampMillis | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_i32_with_schema(value, variant_schema); } _ => { /* skip */ } } } Err(create_error(format!( "Cannot find a matching int-like schema in {union_schema:?}" ))) } expected => Err(create_error(format!("Expected {expected}. Got: Int/Long"))), } } fn serialize_i64_with_schema(&mut self, value: i64, schema: &Schema) -> Result<usize, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "i64", value: format!("{value}. Cause: {cause}"), schema: schema.clone(), }; match schema { Schema::Int | Schema::TimeMillis | Schema::Date => { let int_value = i32::try_from(value).map_err(|cause| create_error(cause.to_string()))?; encode_int(int_value, &mut self.writer) } Schema::Long | Schema::TimeMicros | Schema::TimestampMillis | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos => encode_long(value, &mut self.writer), Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::Int | Schema::TimeMillis | Schema::Date | Schema::Long | Schema::TimeMicros | Schema::TimestampMillis | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_i64_with_schema(value, variant_schema); } _ => { /* skip */ } } } Err(create_error(format!( "Cannot find a matching int/long-like schema in {:?}", union_schema.schemas ))) } expected => Err(create_error(format!("Expected: {expected}. Got: Int/Long"))), } } fn serialize_u8_with_schema(&mut self, value: u8, schema: &Schema) -> Result<usize, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "u8", value: format!("{value}. Cause: {cause}"), schema: schema.clone(), }; match schema { Schema::Int | Schema::TimeMillis | Schema::Date => { encode_int(value as i32, &mut self.writer) } Schema::Long | Schema::TimeMicros | Schema::TimestampMillis | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos => encode_long(value as i64, &mut self.writer), Schema::Bytes => self.write_bytes(&[value]), Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::Int | Schema::TimeMillis | Schema::Date | Schema::Long | Schema::TimeMicros | Schema::TimestampMillis | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos | Schema::Bytes => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_u8_with_schema(value, variant_schema); } _ => { /* skip */ } } } Err(create_error(format!("Cannot find a matching Int-like, Long-like or Bytes schema in {union_schema:?}"))) } expected => Err(create_error(format!("Expected: {expected}. Got: Int"))), } } fn serialize_u32_with_schema(&mut self, value: u32, schema: &Schema) -> Result<usize, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "unsigned int (u16 | u32)", value: format!("{value}. Cause: {cause}"), schema: schema.clone(), }; match schema { Schema::Int | Schema::TimeMillis | Schema::Date => { let int_value = i32::try_from(value).map_err(|cause| create_error(cause.to_string()))?; encode_int(int_value, &mut self.writer) } Schema::Long | Schema::TimeMicros | Schema::TimestampMillis | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos => encode_long(value as i64, &mut self.writer), Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::Int | Schema::TimeMillis | Schema::Date | Schema::Long | Schema::TimeMicros | Schema::TimestampMillis | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_u32_with_schema(value, variant_schema); } _ => { /* skip */ } } } Err(create_error(format!( "Cannot find a matching Int-like or Long-like schema in {union_schema:?}" ))) } expected => Err(create_error(format!("Expected: {expected}. Got: Int/Long"))), } } fn serialize_u64_with_schema(&mut self, value: u64, schema: &Schema) -> Result<usize, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "u64", value: format!("{value}. Cause: {cause}"), schema: schema.clone(), }; match schema { Schema::Int | Schema::TimeMillis | Schema::Date => { let int_value = i32::try_from(value).map_err(|cause| create_error(cause.to_string()))?; encode_int(int_value, &mut self.writer) } Schema::Long | Schema::TimeMicros | Schema::TimestampMillis | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos => { let long_value = i64::try_from(value).map_err(|cause| create_error(cause.to_string()))?; encode_long(long_value, &mut self.writer) } Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::Int | Schema::TimeMillis | Schema::Date | Schema::Long | Schema::TimeMicros | Schema::TimestampMillis | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_u64_with_schema(value, variant_schema); } _ => { /* skip */ } } } Err(create_error(format!( "Cannot find a matching Int-like or Long-like schema in {:?}", union_schema.schemas ))) } expected => Err(create_error(format!("Expected {expected}. Got: Int/Long"))), } } fn serialize_f32_with_schema(&mut self, value: f32, schema: &Schema) -> Result<usize, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "f32", value: format!("{value}. Cause: {cause}"), schema: schema.clone(), }; match schema { Schema::Float => self .writer .write(&value.to_le_bytes()) .map_err(Error::WriteBytes), Schema::Double => self .writer .write(&(value as f64).to_le_bytes()) .map_err(Error::WriteBytes), Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::Float | Schema::Double => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_f32_with_schema(value, variant_schema); } _ => { /* skip */ } } } Err(create_error(format!( "Cannot find a Float schema in {:?}", union_schema.schemas ))) } expected => Err(create_error(format!("Expected: {expected}. Got: Float"))), } } fn serialize_f64_with_schema(&mut self, value: f64, schema: &Schema) -> Result<usize, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "f64", value: format!("{value}. Cause: {cause}"), schema: schema.clone(), }; match schema { Schema::Float => self .writer .write(&(value as f32).to_le_bytes()) .map_err(Error::WriteBytes), Schema::Double => self .writer .write(&value.to_le_bytes()) .map_err(Error::WriteBytes), Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::Float | Schema::Double => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_f64_with_schema(value, variant_schema); } _ => { /* skip */ } } } Err(create_error(format!( "Cannot find a Double schema in {:?}", union_schema.schemas ))) } expected => Err(create_error(format!("Expected: {expected}. Got: Double"))), } } fn serialize_char_with_schema(&mut self, value: char, schema: &Schema) -> Result<usize, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "char", value: format!("{value}. Cause: {cause}"), schema: schema.clone(), }; match schema { Schema::String | Schema::Bytes => self.write_bytes(String::from(value).as_bytes()), Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::String | Schema::Bytes => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_char_with_schema(value, variant_schema); } _ => { /* skip */ } } } Err(create_error(format!( "Cannot find a matching String or Bytes schema in {union_schema:?}" ))) } expected => Err(create_error(format!("Expected {expected}. Got: char"))), } } fn serialize_str_with_schema(&mut self, value: &str, schema: &Schema) -> Result<usize, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "string", value: format!("{value}. Cause: {cause}"), schema: schema.clone(), }; match schema { Schema::String | Schema::Bytes | Schema::Uuid => self.write_bytes(value.as_bytes()), Schema::BigDecimal => { // If we get a string for a `BigDecimal` type, expect a display string representation, such as "12.75" let decimal_val = BigDecimal::from_str(value).map_err(|e| create_error(e.to_string()))?; let decimal_bytes = big_decimal_as_bytes(&decimal_val)?; self.write_bytes(decimal_bytes.as_slice()) } Schema::Fixed(fixed_schema) => { if value.len() == fixed_schema.size { self.writer .write(value.as_bytes()) .map_err(Error::WriteBytes) } else { Err(create_error(format!( "Fixed schema size ({}) does not match the value length ({})", fixed_schema.size, value.len() ))) } } Schema::Ref { name } => { let ref_schema = self.get_ref_schema(name)?; self.serialize_str_with_schema(value, ref_schema) } Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::String | Schema::Bytes | Schema::Uuid | Schema::Fixed(_) | Schema::Ref { name: _ } => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_str_with_schema(value, variant_schema); } _ => { /* skip */ } } } Err(create_error(format!( "Expected one of the union variants {:?}. Got: String", union_schema.schemas ))) } expected => Err(create_error(format!("Expected: {expected}. Got: String"))), } } fn serialize_bytes_with_schema( &mut self, value: &[u8], schema: &Schema, ) -> Result<usize, Error> { let create_error = |cause: String| { use std::fmt::Write; let mut v_str = String::with_capacity(value.len()); for b in value { if write!(&mut v_str, "{:x}", b).is_err() { v_str.push_str("??"); } } Error::SerializeValueWithSchema { value_type: "bytes", value: format!("{v_str}. Cause: {cause}"), schema: schema.clone(), } }; match schema { Schema::String | Schema::Bytes | Schema::Uuid | Schema::BigDecimal => { self.write_bytes(value) } Schema::Fixed(fixed_schema) => { if value.len() == fixed_schema.size { self.writer.write(value).map_err(Error::WriteBytes) } else { Err(create_error(format!("Fixed schema size ({}) does not match the value length ({})", fixed_schema.size, value.len()))) } } Schema::Duration => { if value.len() == 12 { self.writer.write(value).map_err(Error::WriteBytes) } else { Err(create_error(format!("Duration length must be 12! Got ({})", value.len()))) } } Schema::Decimal(decimal_schema) => match decimal_schema.inner.as_ref() { Schema::Bytes => self.write_bytes(value), Schema::Fixed(fixed_schema) => match fixed_schema.size.checked_sub(value.len()) { Some(pad) => { let pad_val = match value.len() { 0 => 0, _ => value[0], }; let padding = vec![pad_val; pad]; self.writer .write(padding.as_slice()) .map_err(Error::WriteBytes)?; self.writer.write(value).map_err(Error::WriteBytes) } None => Err(Error::CompareFixedSizes { size: fixed_schema.size, n: value.len(), }), }, unsupported => Err(create_error(format!("Decimal schema's inner should be Bytes or Fixed schema. Got: {unsupported}"))), }, Schema::Ref { name } => { let ref_schema = self.get_ref_schema(name)?; self.serialize_bytes_with_schema(value, ref_schema) } Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::String | Schema::Bytes | Schema::Uuid | Schema::BigDecimal | Schema::Fixed(_) | Schema::Duration | Schema::Decimal(_) | Schema::Ref { name: _ } => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_bytes_with_schema(value, variant_schema); } _ => { /* skip */ } } } Err(create_error(format!("Cannot find a matching String, Bytes, Uuid, BigDecimal, Fixed, Duration, Decimal or Ref schema in {union_schema:?}"))) } unsupported => Err(create_error(format!("Expected String, Bytes, Uuid, BigDecimal, Fixed, Duration, Decimal, Ref or Union schema. Got: {unsupported}"))), } } fn serialize_none_with_schema(&mut self, schema: &Schema) -> Result<usize, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "none", value: format!("None. Cause: {cause}"), schema: schema.clone(), }; match schema { Schema::Null => Ok(0), Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::Null => { return encode_int(i as i32, &mut *self.writer); } _ => { /* skip */ } } } Err(create_error(format!( "Cannot find a matching Null schema in {:?}", union_schema.schemas ))) } expected => Err(create_error(format!("Expected: {expected}. Got: Null"))), } } fn serialize_some_with_schema<T>(&mut self, value: &T, schema: &Schema) -> Result<usize, Error> where T: ?Sized + ser::Serialize, { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "some", value: format!("Some(?). Cause: {cause}"), schema: schema.clone(), }; match schema { Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::Null => { /* skip */ } _ => { encode_int(i as i32, &mut *self.writer)?; let mut variant_ser = SchemaAwareWriteSerializer::new( &mut *self.writer, variant_schema, self.names, self.enclosing_namespace.clone(), ); return value.serialize(&mut variant_ser); } } } Err(create_error(format!( "Cannot find a matching Null schema in {:?}", union_schema.schemas ))) } _ => value.serialize(self), } } fn serialize_unit_struct_with_schema( &mut self, name: &'static str, schema: &Schema, ) -> Result<usize, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "unit struct", value: format!("{name}. Cause: {cause}"), schema: schema.clone(), }; match schema { Schema::Record(sch) => match sch.fields.len() { 0 => Ok(0), too_many => Err(create_error(format!( "Too many fields: {}. Expected: 0", too_many ))), }, Schema::Null => Ok(0), Schema::Ref { name: ref_name } => { let ref_schema = self.get_ref_schema(ref_name)?; self.serialize_unit_struct_with_schema(name, ref_schema) } Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::Record(record_schema) if record_schema.fields.is_empty() => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_unit_struct_with_schema(name, variant_schema); } Schema::Null | Schema::Ref { name: _ } => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_unit_struct_with_schema(name, variant_schema); } _ => { /* skip */ } } } Err(create_error(format!( "Cannot find a matching Null schema in {union_schema:?}" ))) } unsupported => Err(create_error(format!( "Expected Null or Union schema. Got: {unsupported}" ))), } } fn serialize_unit_variant_with_schema( &mut self, name: &'static str, variant_index: u32, variant: &'static str, schema: &Schema, ) -> Result<usize, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "unit variant", value: format!("{name}::{variant} (index={variant_index}). Cause: {cause}"), schema: schema.clone(), }; match schema { Schema::Enum(enum_schema) => { if variant_index as usize >= enum_schema.symbols.len() { return Err(create_error(format!( "Variant index out of bounds: {}. The Enum schema has '{}' symbols", variant_index, enum_schema.symbols.len() ))); } encode_int(variant_index as i32, &mut self.writer) } Schema::Union(union_schema) => { if variant_index as usize >= union_schema.schemas.len() { return Err(create_error(format!( "Variant index out of bounds: {}. The union schema has '{}' schemas", variant_index, union_schema.schemas.len() ))); } encode_int(variant_index as i32, &mut self.writer)?; self.serialize_unit_struct_with_schema( name, &union_schema.schemas[variant_index as usize], ) } Schema::Ref { name: ref_name } => { let ref_schema = self.get_ref_schema(ref_name)?; self.serialize_unit_variant_with_schema(name, variant_index, variant, ref_schema) } unsupported => Err(create_error(format!( "Unsupported schema: {:?}. Expected: Enum, Union or Ref", unsupported ))), } } fn serialize_newtype_struct_with_schema<T>( &mut self, _name: &'static str, value: &T, schema: &Schema, ) -> Result<usize, Error> where T: ?Sized + ser::Serialize, { let mut inner_ser = SchemaAwareWriteSerializer::new( &mut *self.writer, schema, self.names, self.enclosing_namespace.clone(), ); // Treat any newtype struct as a transparent wrapper around the contained type value.serialize(&mut inner_ser) } fn serialize_newtype_variant_with_schema<T>( &mut self, name: &'static str, variant_index: u32, variant: &'static str, value: &T, schema: &Schema, ) -> Result<usize, Error> where T: ?Sized + ser::Serialize, { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "newtype variant", value: format!("{name}::{variant}(?) (index={variant_index}). Cause: {cause}"), schema: schema.clone(), }; match schema { Schema::Union(union_schema) => { let variant_schema = union_schema .schemas .get(variant_index as usize) .ok_or_else(|| { create_error(format!( "No variant schema at position {variant_index} for {union_schema:?}" )) })?; encode_int(variant_index as i32, &mut self.writer)?; self.serialize_newtype_struct_with_schema(variant, value, variant_schema) } _ => Err(create_error(format!( "Expected Union schema. Got: {schema}" ))), } } fn serialize_seq_with_schema<'a>( &'a mut self, len: Option<usize>, schema: &'s Schema, ) -> Result<SchemaAwareWriteSerializeSeq<'a, 's, W>, Error> { let create_error = |cause: String| { let len_str = len .map(|l| format!("{l}")) .unwrap_or_else(|| String::from("?")); Error::SerializeValueWithSchema { value_type: "sequence", value: format!("sequence (len={len_str}). Cause: {cause}"), schema: schema.clone(), } }; match schema { Schema::Array(array_schema) => Ok(SchemaAwareWriteSerializeSeq::new( self, array_schema.items.as_ref(), len, )), Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::Array(_) => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_seq_with_schema(len, variant_schema); } _ => { /* skip */ } } } Err(create_error(format!( "Expected Array schema in {union_schema:?}" ))) } _ => Err(create_error(format!("Expected: {schema}. Got: Array"))), } } fn serialize_tuple_with_schema<'a>( &'a mut self, len: usize, schema: &'s Schema, ) -> Result<SchemaAwareWriteSerializeSeq<'a, 's, W>, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "tuple", value: format!("tuple (len={len}). Cause: {cause}"), schema: schema.clone(), }; match schema { Schema::Array(array_schema) => Ok(SchemaAwareWriteSerializeSeq::new( self, array_schema.items.as_ref(), Some(len), )), Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::Array(_) => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_tuple_with_schema(len, variant_schema); } _ => { /* skip */ } } } Err(create_error(format!( "Expected Array schema in {union_schema:?}" ))) } _ => Err(create_error(format!("Expected: {schema}. Got: Array"))), } } fn serialize_tuple_struct_with_schema<'a>( &'a mut self, name: &'static str, len: usize, schema: &'s Schema, ) -> Result<SchemaAwareWriteSerializeTupleStruct<'a, 's, W>, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "tuple struct", value: format!( "{name}({}). Cause: {cause}", vec!["?"; len].as_slice().join(",") ), schema: schema.clone(), }; match schema { Schema::Array(sch) => Ok(SchemaAwareWriteSerializeTupleStruct::Array( SchemaAwareWriteSerializeSeq::new(self, &sch.items, Some(len)), )), Schema::Record(sch) => Ok(SchemaAwareWriteSerializeTupleStruct::Record( SchemaAwareWriteSerializeStruct::new(self, sch, len), )), Schema::Ref { name: ref_name } => { let ref_schema = self.get_ref_schema(ref_name)?; self.serialize_tuple_struct_with_schema(name, len, ref_schema) } Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::Record(inner) => { if inner.fields.len() == len { encode_int(i as i32, &mut *self.writer)?; return self.serialize_tuple_struct_with_schema( name, len, variant_schema, ); } } Schema::Array(_) | Schema::Ref { name: _ } => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_tuple_struct_with_schema( name, len, variant_schema, ); } _ => { /* skip */ } } } Err(create_error(format!( "Expected Record, Array or Ref schema in {union_schema:?}" ))) } _ => Err(create_error(format!( "Expected Record, Array, Ref or Union schema. Got: {schema}" ))), } } fn serialize_tuple_variant_with_schema<'a>( &'a mut self, name: &'static str, variant_index: u32, variant: &'static str, len: usize, schema: &'s Schema, ) -> Result<SchemaAwareWriteSerializeTupleStruct<'a, 's, W>, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "tuple variant", value: format!( "{name}::{variant}({}) (index={variant_index}). Cause: {cause}", vec!["?"; len].as_slice().join(",") ), schema: schema.clone(), }; match schema { Schema::Union(union_schema) => { let variant_schema = union_schema .schemas .get(variant_index as usize) .ok_or_else(|| { create_error(format!( "Cannot find a variant at position {variant_index} in {union_schema:?}" )) })?; encode_int(variant_index as i32, &mut self.writer)?; self.serialize_tuple_struct_with_schema(variant, len, variant_schema) } _ => Err(create_error(format!( "Expected Union schema. Got: {schema}" ))), } } fn serialize_map_with_schema<'a>( &'a mut self, len: Option<usize>, schema: &'s Schema, ) -> Result<SchemaAwareWriteSerializeMap<'a, 's, W>, Error> { let create_error = |cause: String| { let len_str = len .map(|l| format!("{}", l)) .unwrap_or_else(|| String::from("?")); Error::SerializeValueWithSchema { value_type: "map", value: format!("map (size={len_str}). Cause: {cause}"), schema: schema.clone(), } }; match schema { Schema::Map(map_schema) => Ok(SchemaAwareWriteSerializeMap::new( self, map_schema.types.as_ref(), len, )), Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::Map(_) => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_map_with_schema(len, variant_schema); } _ => { /* skip */ } } } Err(create_error(format!( "Expected a Map schema in {union_schema:?}" ))) } _ => Err(create_error(format!( "Expected Map or Union schema. Got: {schema}" ))), } } fn serialize_struct_with_schema<'a>( &'a mut self, name: &'static str, len: usize, schema: &'s Schema, ) -> Result<SchemaAwareWriteSerializeStruct<'a, 's, W>, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "struct", value: format!("{name}{{ ... }}. Cause: {cause}"), schema: schema.clone(), }; match schema { Schema::Record(record_schema) => Ok(SchemaAwareWriteSerializeStruct::new( self, record_schema, len, )), Schema::Ref { name: ref_name } => { let ref_schema = self.get_ref_schema(ref_name)?; self.serialize_struct_with_schema(name, len, ref_schema) } Schema::Union(union_schema) => { for (i, variant_schema) in union_schema.schemas.iter().enumerate() { match variant_schema { Schema::Record(inner) if inner.fields.len() == len && inner.name.name == name => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_struct_with_schema(name, len, variant_schema); } Schema::Ref { name: _ } => { encode_int(i as i32, &mut *self.writer)?; return self.serialize_struct_with_schema(name, len, variant_schema); } _ => { /* skip */ } } } Err(create_error(format!( "Expected Record or Ref schema in {union_schema:?}" ))) } _ => Err(create_error(format!( "Expected Record, Ref or Union schema. Got: {schema}" ))), } } fn serialize_struct_variant_with_schema<'a>( &'a mut self, name: &'static str, variant_index: u32, variant: &'static str, len: usize, schema: &'s Schema, ) -> Result<SchemaAwareWriteSerializeStruct<'a, 's, W>, Error> { let create_error = |cause: String| Error::SerializeValueWithSchema { value_type: "struct variant", value: format!("{name}::{variant}{{ ... }} (size={len}. Cause: {cause})"), schema: schema.clone(), }; match schema { Schema::Union(union_schema) => { let variant_schema = union_schema .schemas .get(variant_index as usize) .ok_or_else(|| { create_error(format!( "Cannot find variant at position {variant_index} in {union_schema:?}" )) })?; encode_int(variant_index as i32, &mut self.writer)?; self.serialize_struct_with_schema(variant, len, variant_schema) } _ => Err(create_error(format!( "Expected Union schema. Got: {schema}" ))), } } } impl<'a, 's, W: Write> ser::Serializer for &'a mut SchemaAwareWriteSerializer<'s, W> { type Ok = usize; type Error = Error; type SerializeSeq = SchemaAwareWriteSerializeSeq<'a, 's, W>; type SerializeTuple = SchemaAwareWriteSerializeSeq<'a, 's, W>; type SerializeTupleStruct = SchemaAwareWriteSerializeTupleStruct<'a, 's, W>; type SerializeTupleVariant = SchemaAwareWriteSerializeTupleStruct<'a, 's, W>; type SerializeMap = SchemaAwareWriteSerializeMap<'a, 's, W>; type SerializeStruct = SchemaAwareWriteSerializeStruct<'a, 's, W>; type SerializeStructVariant = SchemaAwareWriteSerializeStruct<'a, 's, W>; fn serialize_bool(self, v: bool) -> Result<Self::Ok, Self::Error> { self.serialize_bool_with_schema(v, self.root_schema) } fn serialize_i8(self, v: i8) -> Result<Self::Ok, Self::Error> { self.serialize_i32(v as i32) } fn serialize_i16(self, v: i16) -> Result<Self::Ok, Self::Error> { self.serialize_i32(v as i32) } fn serialize_i32(self, v: i32) -> Result<Self::Ok, Self::Error> { self.serialize_i32_with_schema(v, self.root_schema) } fn serialize_i64(self, v: i64) -> Result<Self::Ok, Self::Error> { self.serialize_i64_with_schema(v, self.root_schema) } fn serialize_u8(self, v: u8) -> Result<Self::Ok, Self::Error> { self.serialize_u8_with_schema(v, self.root_schema) } fn serialize_u16(self, v: u16) -> Result<Self::Ok, Self::Error> { self.serialize_u32(v as u32) } fn serialize_u32(self, v: u32) -> Result<Self::Ok, Self::Error> { self.serialize_u32_with_schema(v, self.root_schema) } fn serialize_u64(self, v: u64) -> Result<Self::Ok, Self::Error> { self.serialize_u64_with_schema(v, self.root_schema) } fn serialize_f32(self, v: f32) -> Result<Self::Ok, Self::Error> { self.serialize_f32_with_schema(v, self.root_schema) } fn serialize_f64(self, v: f64) -> Result<Self::Ok, Self::Error> { self.serialize_f64_with_schema(v, self.root_schema) } fn serialize_char(self, v: char) -> Result<Self::Ok, Self::Error> { self.serialize_char_with_schema(v, self.root_schema) } fn serialize_str(self, v: &str) -> Result<Self::Ok, Self::Error> { self.serialize_str_with_schema(v, self.root_schema) } fn serialize_bytes(self, v: &[u8]) -> Result<Self::Ok, Self::Error> { self.serialize_bytes_with_schema(v, self.root_schema) } fn serialize_none(self) -> Result<Self::Ok, Self::Error> { self.serialize_none_with_schema(self.root_schema) } fn serialize_some<T>(self, value: &T) -> Result<Self::Ok, Self::Error> where T: ?Sized + ser::Serialize, { self.serialize_some_with_schema(value, self.root_schema) } fn serialize_unit(self) -> Result<Self::Ok, Self::Error> { self.serialize_none() } fn serialize_unit_struct(self, name: &'static str) -> Result<Self::Ok, Self::Error> { self.serialize_unit_struct_with_schema(name, self.root_schema) } fn serialize_unit_variant( self, name: &'static str, variant_index: u32, variant: &'static str, ) -> Result<Self::Ok, Self::Error> { self.serialize_unit_variant_with_schema(name, variant_index, variant, self.root_schema) } fn serialize_newtype_struct<T>( self, name: &'static str, value: &T, ) -> Result<Self::Ok, Self::Error> where T: ?Sized + ser::Serialize, { self.serialize_newtype_struct_with_schema(name, value, self.root_schema) } fn serialize_newtype_variant<T>( self, name: &'static str, variant_index: u32, variant: &'static str, value: &T, ) -> Result<Self::Ok, Self::Error> where T: ?Sized + ser::Serialize, { self.serialize_newtype_variant_with_schema( name, variant_index, variant, value, self.root_schema, ) } fn serialize_seq(self, len: Option<usize>) -> Result<Self::SerializeSeq, Self::Error> { self.serialize_seq_with_schema(len, self.root_schema) } fn serialize_tuple(self, len: usize) -> Result<Self::SerializeTuple, Self::Error> { self.serialize_tuple_with_schema(len, self.root_schema) } fn serialize_tuple_struct( self, name: &'static str, len: usize, ) -> Result<Self::SerializeTupleStruct, Self::Error> { self.serialize_tuple_struct_with_schema(name, len, self.root_schema) } fn serialize_tuple_variant( self, name: &'static str, variant_index: u32, variant: &'static str, len: usize, ) -> Result<Self::SerializeTupleVariant, Self::Error> { self.serialize_tuple_variant_with_schema( name, variant_index, variant, len, self.root_schema, ) } fn serialize_map(self, len: Option<usize>) -> Result<Self::SerializeMap, Self::Error> { self.serialize_map_with_schema(len, self.root_schema) } fn serialize_struct( self, name: &'static str, len: usize, ) -> Result<Self::SerializeStruct, Self::Error> { self.serialize_struct_with_schema(name, len, self.root_schema) } fn serialize_struct_variant( self, name: &'static str, variant_index: u32, variant: &'static str, len: usize, ) -> Result<Self::SerializeStructVariant, Self::Error> { self.serialize_struct_variant_with_schema( name, variant_index, variant, len, self.root_schema, ) } fn is_human_readable(&self) -> bool { crate::util::is_human_readable() } } #[cfg(test)] mod tests { use super::*; use crate::{decimal::Decimal, schema::ResolvedSchema, Days, Duration, Millis, Months}; use apache_avro_test_helper::TestResult; use bigdecimal::BigDecimal; use num_bigint::{BigInt, Sign}; use serde::Serialize; use serde_bytes::{ByteArray, Bytes}; use serial_test::serial; use std::{ collections::{BTreeMap, HashMap}, marker::PhantomData, sync::atomic::Ordering, }; use uuid::Uuid; #[test] fn test_serialize_null() -> TestResult { let schema = Schema::Null; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); ().serialize(&mut serializer)?; None::<()>.serialize(&mut serializer)?; None::<i32>.serialize(&mut serializer)?; None::<String>.serialize(&mut serializer)?; assert!("".serialize(&mut serializer).is_err()); assert!(Some("").serialize(&mut serializer).is_err()); assert_eq!(buffer.as_slice(), Vec::<u8>::new().as_slice()); Ok(()) } #[test] fn test_serialize_bool() -> TestResult { let schema = Schema::Boolean; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); true.serialize(&mut serializer)?; false.serialize(&mut serializer)?; assert!("".serialize(&mut serializer).is_err()); assert!(Some("").serialize(&mut serializer).is_err()); assert_eq!(buffer.as_slice(), &[1, 0]); Ok(()) } #[test] fn test_serialize_int() -> TestResult { let schema = Schema::Int; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); 4u8.serialize(&mut serializer)?; 31u16.serialize(&mut serializer)?; 13u32.serialize(&mut serializer)?; 7i8.serialize(&mut serializer)?; (-57i16).serialize(&mut serializer)?; 129i32.serialize(&mut serializer)?; assert!("".serialize(&mut serializer).is_err()); assert!(Some("").serialize(&mut serializer).is_err()); assert_eq!(buffer.as_slice(), &[8, 62, 26, 14, 113, 130, 2]); Ok(()) } #[test] fn test_serialize_long() -> TestResult { let schema = Schema::Long; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); 4u8.serialize(&mut serializer)?; 31u16.serialize(&mut serializer)?; 13u32.serialize(&mut serializer)?; 291u64.serialize(&mut serializer)?; 7i8.serialize(&mut serializer)?; (-57i16).serialize(&mut serializer)?; 129i32.serialize(&mut serializer)?; (-432i64).serialize(&mut serializer)?; assert!("".serialize(&mut serializer).is_err()); assert!(Some("").serialize(&mut serializer).is_err()); assert_eq!( buffer.as_slice(), &[8, 62, 26, 198, 4, 14, 113, 130, 2, 223, 6] ); Ok(()) } #[test] fn test_serialize_float() -> TestResult { let schema = Schema::Float; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); 4.7f32.serialize(&mut serializer)?; (-14.1f64).serialize(&mut serializer)?; assert!("".serialize(&mut serializer).is_err()); assert!(Some("").serialize(&mut serializer).is_err()); assert_eq!(buffer.as_slice(), &[102, 102, 150, 64, 154, 153, 97, 193]); Ok(()) } #[test] fn test_serialize_double() -> TestResult { let schema = Schema::Float; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); 4.7f32.serialize(&mut serializer)?; (-14.1f64).serialize(&mut serializer)?; assert!("".serialize(&mut serializer).is_err()); assert!(Some("").serialize(&mut serializer).is_err()); assert_eq!(buffer.as_slice(), &[102, 102, 150, 64, 154, 153, 97, 193]); Ok(()) } #[test] fn test_serialize_bytes() -> TestResult { let schema = Schema::Bytes; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); 'a'.serialize(&mut serializer)?; "test".serialize(&mut serializer)?; Bytes::new(&[12, 3, 7, 91, 4]).serialize(&mut serializer)?; assert!(().serialize(&mut serializer).is_err()); assert!(PhantomData::<String>.serialize(&mut serializer).is_err()); assert_eq!( buffer.as_slice(), &[2, b'a', 8, b't', b'e', b's', b't', 10, 12, 3, 7, 91, 4] ); Ok(()) } #[test] fn test_serialize_string() -> TestResult { let schema = Schema::String; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); 'a'.serialize(&mut serializer)?; "test".serialize(&mut serializer)?; Bytes::new(&[12, 3, 7, 91, 4]).serialize(&mut serializer)?; assert!(().serialize(&mut serializer).is_err()); assert!(PhantomData::<String>.serialize(&mut serializer).is_err()); assert_eq!( buffer.as_slice(), &[2, b'a', 8, b't', b'e', b's', b't', 10, 12, 3, 7, 91, 4] ); Ok(()) } #[test] fn test_serialize_record() -> TestResult { let schema = Schema::parse_str( r#"{ "type": "record", "name": "TestRecord", "fields": [ {"name": "stringField", "type": "string"}, {"name": "intField", "type": "int"} ] }"#, )?; #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct GoodTestRecord { string_field: String, int_field: i32, } #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct BadTestRecord { foo_string_field: String, bar_int_field: i32, } let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); let good_record = GoodTestRecord { string_field: String::from("test"), int_field: 10, }; good_record.serialize(&mut serializer)?; let bad_record = BadTestRecord { foo_string_field: String::from("test"), bar_int_field: 10, }; assert!(bad_record.serialize(&mut serializer).is_err()); assert!("".serialize(&mut serializer).is_err()); assert!(Some("").serialize(&mut serializer).is_err()); assert_eq!(buffer.as_slice(), &[8, b't', b'e', b's', b't', 20]); Ok(()) } #[test] fn test_serialize_empty_record() -> TestResult { let schema = Schema::parse_str( r#"{ "type": "record", "name": "EmptyRecord", "fields": [] }"#, )?; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); #[derive(Serialize)] struct EmptyRecord; EmptyRecord.serialize(&mut serializer)?; #[derive(Serialize)] struct NonEmptyRecord { foo: String, } let record = NonEmptyRecord { foo: "bar".to_string(), }; match record.serialize(&mut serializer) { Err(Error::FieldName(field_name)) if field_name == "foo" => (), unexpected => panic!("Expected an error. Got: {unexpected:?}"), } match ().serialize(&mut serializer) { Err(Error::SerializeValueWithSchema { value_type, value, schema, }) => { assert_eq!(value_type, "none"); // serialize_unit() delegates to serialize_none() assert_eq!(value, "None. Cause: Expected: Record. Got: Null"); assert_eq!(schema, schema); } unexpected => panic!("Expected an error. Got: {unexpected:?}"), } assert_eq!(buffer.len(), 0); Ok(()) } #[test] fn test_serialize_enum() -> TestResult { let schema = Schema::parse_str( r#"{ "type": "enum", "name": "Suit", "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"] }"#, )?; #[derive(Serialize)] enum Suit { Spades, Hearts, Diamonds, Clubs, } let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); Suit::Spades.serialize(&mut serializer)?; Suit::Hearts.serialize(&mut serializer)?; Suit::Diamonds.serialize(&mut serializer)?; Suit::Clubs.serialize(&mut serializer)?; match None::<()>.serialize(&mut serializer) { Err(Error::SerializeValueWithSchema { value_type, value, schema, }) => { assert_eq!(value_type, "none"); assert_eq!(value, "None. Cause: Expected: Enum. Got: Null"); assert_eq!(schema, schema); } unexpected => panic!("Expected an error. Got: {unexpected:?}"), } assert_eq!(buffer.as_slice(), &[0, 2, 4, 6]); Ok(()) } #[test] fn test_serialize_array() -> TestResult { let schema = Schema::parse_str( r#"{ "type": "array", "items": "long" }"#, )?; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); let arr: Vec<i64> = vec![10, 5, 400]; arr.serialize(&mut serializer)?; match vec![1_f32].serialize(&mut serializer) { Err(Error::SerializeValueWithSchema { value_type, value, schema, }) => { assert_eq!(value_type, "f32"); assert_eq!(value, "1. Cause: Expected: Long. Got: Float"); assert_eq!(schema, schema); } unexpected => panic!("Expected an error. Got: {unexpected:?}"), } assert_eq!(buffer.as_slice(), &[6, 20, 10, 160, 6, 0]); Ok(()) } #[test] fn test_serialize_map() -> TestResult { let schema = Schema::parse_str( r#"{ "type": "map", "values": "long" }"#, )?; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); let mut map: BTreeMap<String, i64> = BTreeMap::new(); map.insert(String::from("item1"), 10); map.insert(String::from("item2"), 400); map.serialize(&mut serializer)?; let mut map: BTreeMap<String, &str> = BTreeMap::new(); map.insert(String::from("item1"), "value1"); match map.serialize(&mut serializer) { Err(Error::SerializeValueWithSchema { value_type, value, schema, }) => { assert_eq!(value_type, "string"); assert_eq!(value, "value1. Cause: Expected: Long. Got: String"); assert_eq!(schema, schema); } unexpected => panic!("Expected an error. Got: {unexpected:?}"), } assert_eq!( buffer.as_slice(), &[ 4, 10, b'i', b't', b'e', b'm', b'1', 20, 10, b'i', b't', b'e', b'm', b'2', 160, 6, 0 ] ); Ok(()) } #[test] fn test_serialize_nullable_union() -> TestResult { let schema = Schema::parse_str( r#"{ "type": ["null", "long"] }"#, )?; #[derive(Serialize)] enum NullableLong { Null, Long(i64), } let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); Some(10i64).serialize(&mut serializer)?; None::<i64>.serialize(&mut serializer)?; NullableLong::Long(400).serialize(&mut serializer)?; NullableLong::Null.serialize(&mut serializer)?; match "invalid".serialize(&mut serializer) { Err(Error::SerializeValueWithSchema { value_type, value, schema, }) => { assert_eq!(value_type, "string"); assert_eq!( value, "invalid. Cause: Expected one of the union variants [Null, Long]. Got: String" ); assert_eq!(schema, schema); } unexpected => panic!("Expected an error. Got: {unexpected:?}"), } assert_eq!(buffer.as_slice(), &[2, 20, 0, 2, 160, 6, 0]); Ok(()) } #[test] fn test_serialize_union() -> TestResult { let schema = Schema::parse_str( r#"{ "type": ["null", "long", "string"] }"#, )?; #[derive(Serialize)] enum LongOrString { Null, Long(i64), Str(String), } let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); LongOrString::Null.serialize(&mut serializer)?; LongOrString::Long(400).serialize(&mut serializer)?; LongOrString::Str(String::from("test")).serialize(&mut serializer)?; match 1_f64.serialize(&mut serializer) { Err(Error::SerializeValueWithSchema { value_type, value, schema, }) => { assert_eq!(value_type, "f64"); assert_eq!( value, "1. Cause: Cannot find a Double schema in [Null, Long, String]" ); assert_eq!(schema, schema); } unexpected => panic!("Expected an error. Got: {unexpected:?}"), } assert_eq!( buffer.as_slice(), &[0, 2, 160, 6, 4, 8, b't', b'e', b's', b't'] ); Ok(()) } #[test] fn test_serialize_fixed() -> TestResult { let schema = Schema::parse_str( r#"{ "type": "fixed", "size": 8, "name": "LongVal" }"#, )?; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); Bytes::new(&[10, 124, 31, 97, 14, 201, 3, 88]).serialize(&mut serializer)?; // non-8 size match Bytes::new(&[123]).serialize(&mut serializer) { Err(Error::SerializeValueWithSchema { value_type, value, schema, }) => { assert_eq!(value_type, "bytes"); assert_eq!( value, "7b. Cause: Fixed schema size (8) does not match the value length (1)" ); // Bytes represents its values as hexadecimals: '7b' is 123 assert_eq!(schema, schema); } unexpected => panic!("Expected an error. Got: {unexpected:?}"), } // array match [1; 8].serialize(&mut serializer) { Err(Error::SerializeValueWithSchema { value_type, value, schema, }) => { assert_eq!(value_type, "tuple"); // TODO: why is this 'tuple' ?! assert_eq!(value, "tuple (len=8). Cause: Expected: Fixed. Got: Array"); assert_eq!(schema, schema); } unexpected => panic!("Expected an error. Got: {unexpected:?}"), } // slice match &[1, 2, 3, 4, 5, 6, 7, 8].serialize(&mut serializer) { Err(Error::SerializeValueWithSchema { value_type, value, schema, }) => { assert_eq!(*value_type, "tuple"); // TODO: why is this 'tuple' ?! assert_eq!(value, "tuple (len=8). Cause: Expected: Fixed. Got: Array"); assert_eq!(schema, schema); } unexpected => panic!("Expected an error. Got: {unexpected:?}"), } assert_eq!(buffer.as_slice(), &[10, 124, 31, 97, 14, 201, 3, 88]); Ok(()) } #[test] fn test_serialize_decimal_bytes() -> TestResult { let schema = Schema::parse_str( r#"{ "type": "bytes", "logicalType": "decimal", "precision": 16, "scale": 2 }"#, )?; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); let val = Decimal::from(&[251, 155]); val.serialize(&mut serializer)?; match ().serialize(&mut serializer) { Err(Error::SerializeValueWithSchema { value_type, value, schema, }) => { assert_eq!(value_type, "none"); assert_eq!(value, "None. Cause: Expected: Decimal. Got: Null"); assert_eq!(schema, schema); } unexpected => panic!("Expected an error. Got: {unexpected:?}"), } assert_eq!(buffer.as_slice(), &[4, 251, 155]); Ok(()) } #[test] fn test_serialize_decimal_fixed() -> TestResult { let schema = Schema::parse_str( r#"{ "type": "fixed", "name": "FixedDecimal", "size": 8, "logicalType": "decimal", "precision": 16, "scale": 2 }"#, )?; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); let val = Decimal::from(&[0, 0, 0, 0, 0, 0, 251, 155]); val.serialize(&mut serializer)?; match ().serialize(&mut serializer) { Err(Error::SerializeValueWithSchema { value_type, value, schema, }) => { assert_eq!(value_type, "none"); assert_eq!(value, "None. Cause: Expected: Decimal. Got: Null"); assert_eq!(schema, schema); } unexpected => panic!("Expected an error. Got: {unexpected:?}"), } assert_eq!(buffer.as_slice(), &[0, 0, 0, 0, 0, 0, 251, 155]); Ok(()) } #[test] #[serial(serde_is_human_readable)] fn test_serialize_bigdecimal() -> TestResult { let schema = Schema::parse_str( r#"{ "type": "bytes", "logicalType": "big-decimal" }"#, )?; crate::util::SERDE_HUMAN_READABLE.store(true, Ordering::Release); let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); let val = BigDecimal::new(BigInt::new(Sign::Plus, vec![50024]), 2); val.serialize(&mut serializer)?; assert_eq!(buffer.as_slice(), &[10, 6, 0, 195, 104, 4]); Ok(()) } #[test] #[serial(serde_is_human_readable)] fn test_serialize_uuid() -> TestResult { let schema = Schema::parse_str( r#"{ "type": "string", "logicalType": "uuid" }"#, )?; crate::util::SERDE_HUMAN_READABLE.store(true, Ordering::Release); let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); "8c28da81-238c-4326-bddd-4e3d00cc5099" .parse::<Uuid>()? .serialize(&mut serializer)?; match 1_u8.serialize(&mut serializer) { Err(Error::SerializeValueWithSchema { value_type, value, schema, }) => { assert_eq!(value_type, "u8"); assert_eq!(value, "1. Cause: Expected: Uuid. Got: Int"); assert_eq!(schema, schema); } unexpected => panic!("Expected an error. Got: {unexpected:?}"), } assert_eq!( buffer.as_slice(), &[ 72, b'8', b'c', b'2', b'8', b'd', b'a', b'8', b'1', b'-', b'2', b'3', b'8', b'c', b'-', b'4', b'3', b'2', b'6', b'-', b'b', b'd', b'd', b'd', b'-', b'4', b'e', b'3', b'd', b'0', b'0', b'c', b'c', b'5', b'0', b'9', b'9' ] ); Ok(()) } #[test] fn test_serialize_date() -> TestResult { let schema = Schema::parse_str( r#"{ "type": "int", "logicalType": "date" }"#, )?; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); 100_u8.serialize(&mut serializer)?; 1000_u16.serialize(&mut serializer)?; 10000_u32.serialize(&mut serializer)?; 1000_i16.serialize(&mut serializer)?; 10000_i32.serialize(&mut serializer)?; match 10000_f32.serialize(&mut serializer) { Err(Error::SerializeValueWithSchema { value_type, value, schema, }) => { assert_eq!(value_type, "f32"); assert_eq!(value, "10000. Cause: Expected: Date. Got: Float"); assert_eq!(schema, schema); } unexpected => panic!("Expected an error. Got: {unexpected:?}"), } assert_eq!( buffer.as_slice(), &[200, 1, 208, 15, 160, 156, 1, 208, 15, 160, 156, 1] ); Ok(()) } #[test] fn test_serialize_time_millis() -> TestResult { let schema = Schema::parse_str( r#"{ "type": "int", "logicalType": "time-millis" }"#, )?; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); 100_u8.serialize(&mut serializer)?; 1000_u16.serialize(&mut serializer)?; 10000_u32.serialize(&mut serializer)?; 1000_i16.serialize(&mut serializer)?; 10000_i32.serialize(&mut serializer)?; match 10000_f32.serialize(&mut serializer) { Err(Error::SerializeValueWithSchema { value_type, value, schema, }) => { assert_eq!(value_type, "f32"); assert_eq!(value, "10000. Cause: Expected: TimeMillis. Got: Float"); assert_eq!(schema, schema); } unexpected => panic!("Expected an error. Got: {unexpected:?}"), } assert_eq!( buffer.as_slice(), &[200, 1, 208, 15, 160, 156, 1, 208, 15, 160, 156, 1] ); Ok(()) } #[test] fn test_serialize_time_micros() -> TestResult { let schema = Schema::parse_str( r#"{ "type": "long", "logicalType": "time-micros" }"#, )?; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); 100_u8.serialize(&mut serializer)?; 1000_u16.serialize(&mut serializer)?; 10000_u32.serialize(&mut serializer)?; 1000_i16.serialize(&mut serializer)?; 10000_i32.serialize(&mut serializer)?; 10000_i64.serialize(&mut serializer)?; match 10000_f32.serialize(&mut serializer) { Err(Error::SerializeValueWithSchema { value_type, value, schema, }) => { assert_eq!(value_type, "f32"); assert_eq!(value, "10000. Cause: Expected: TimeMicros. Got: Float"); assert_eq!(schema, schema); } unexpected => panic!("Expected an error. Got: {unexpected:?}"), } assert_eq!( buffer.as_slice(), &[200, 1, 208, 15, 160, 156, 1, 208, 15, 160, 156, 1, 160, 156, 1] ); Ok(()) } #[test] fn test_serialize_timestamp() -> TestResult { for precision in ["millis", "micros", "nanos"] { let schema = Schema::parse_str(&format!( r#"{{ "type": "long", "logicalType": "timestamp-{precision}" }}"# ))?; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); 100_u8.serialize(&mut serializer)?; 1000_u16.serialize(&mut serializer)?; 10000_u32.serialize(&mut serializer)?; 1000_i16.serialize(&mut serializer)?; 10000_i32.serialize(&mut serializer)?; 10000_i64.serialize(&mut serializer)?; match 10000_f64.serialize(&mut serializer) { Err(Error::SerializeValueWithSchema { value_type, value, schema, }) => { let mut capital_precision = precision.to_string(); if let Some(c) = capital_precision.chars().next() { capital_precision.replace_range(..1, &c.to_uppercase().to_string()); } assert_eq!(value_type, "f64"); assert_eq!( value, format!( "10000. Cause: Expected: Timestamp{}. Got: Double", capital_precision ) ); assert_eq!(schema, schema); } unexpected => panic!("Expected an error. Got: {unexpected:?}"), } assert_eq!( buffer.as_slice(), &[200, 1, 208, 15, 160, 156, 1, 208, 15, 160, 156, 1, 160, 156, 1] ); } Ok(()) } #[test] fn test_serialize_duration() -> TestResult { let schema = Schema::parse_str( r#"{ "type": "fixed", "size": 12, "name": "duration", "logicalType": "duration" }"#, )?; let mut buffer: Vec<u8> = Vec::new(); let names = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None); let duration_bytes = ByteArray::new(Duration::new(Months::new(3), Days::new(2), Millis::new(1200)).into()); duration_bytes.serialize(&mut serializer)?; match [1; 12].serialize(&mut serializer) { Err(Error::SerializeValueWithSchema { value_type, value, schema, }) => { assert_eq!(value_type, "tuple"); // TODO: why is this 'tuple' ?! assert_eq!( value, "tuple (len=12). Cause: Expected: Duration. Got: Array" ); assert_eq!(schema, schema); } unexpected => panic!("Expected an error. Got: {unexpected:?}"), } assert_eq!(buffer.as_slice(), &[3, 0, 0, 0, 2, 0, 0, 0, 176, 4, 0, 0]); Ok(()) } #[test] #[serial(serde_is_human_readable)] // for BigDecimal and Uuid fn test_serialize_recursive_record() -> TestResult { let schema = Schema::parse_str( r#"{ "type": "record", "name": "TestRecord", "fields": [ {"name": "stringField", "type": "string"}, {"name": "intField", "type": "int"}, {"name": "bigDecimalField", "type": {"type": "bytes", "logicalType": "big-decimal"}}, {"name": "uuidField", "type": "fixed", "size": 16, "logicalType": "uuid"}, {"name": "innerRecord", "type": ["null", "TestRecord"]} ] }"#, )?; #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct TestRecord { string_field: String, int_field: i32, big_decimal_field: BigDecimal, uuid_field: Uuid, // #[serde(skip_serializing_if = "Option::is_none")] => Never ignore None! inner_record: Option<Box<TestRecord>>, } crate::util::SERDE_HUMAN_READABLE.store(true, Ordering::Release); let mut buffer: Vec<u8> = Vec::new(); let rs = ResolvedSchema::try_from(&schema)?; let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, rs.get_names(), None); let good_record = TestRecord { string_field: String::from("test"), int_field: 10, big_decimal_field: BigDecimal::new(BigInt::new(Sign::Plus, vec![50024]), 2), uuid_field: "8c28da81-238c-4326-bddd-4e3d00cc5098".parse::<Uuid>()?, inner_record: Some(Box::new(TestRecord { string_field: String::from("inner_test"), int_field: 100, big_decimal_field: BigDecimal::new(BigInt::new(Sign::Plus, vec![20038]), 2), uuid_field: "8c28da81-238c-4326-bddd-4e3d00cc5099".parse::<Uuid>()?, inner_record: None, })), }; good_record.serialize(&mut serializer)?; assert_eq!( buffer.as_slice(), &[ 8, 116, 101, 115, 116, 20, 10, 6, 0, 195, 104, 4, 72, 56, 99, 50, 56, 100, 97, 56, 49, 45, 50, 51, 56, 99, 45, 52, 51, 50, 54, 45, 98, 100, 100, 100, 45, 52, 101, 51, 100, 48, 48, 99, 99, 53, 48, 57, 56, 2, 20, 105, 110, 110, 101, 114, 95, 116, 101, 115, 116, 200, 1, 8, 4, 78, 70, 4, 72, 56, 99, 50, 56, 100, 97, 56, 49, 45, 50, 51, 56, 99, 45, 52, 51, 50, 54, 45, 98, 100, 100, 100, 45, 52, 101, 51, 100, 48, 48, 99, 99, 53, 48, 57, 57, 0 ] ); Ok(()) } }