avro/benches/serde.rs (427 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. use apache_avro::{ schema::Schema, types::{Record, Value}, AvroResult, Reader, Writer, }; use criterion::{criterion_group, criterion_main, Criterion}; use serde::Serialize; use std::time::Duration; const RAW_SMALL_SCHEMA: &str = r#" { "namespace": "test", "type": "record", "name": "Test", "fields": [ { "type": { "type": "string" }, "name": "field" } ] } "#; #[derive(Serialize, Clone)] struct SmallRecord { field: String, } const RAW_BIG_SCHEMA: &str = r#" { "namespace": "my.example", "type": "record", "name": "userInfo", "fields": [ { "default": null, "type": ["null", "string"], "name": "username" }, { "default": -1, "type": "int", "name": "age" }, { "default": null, "type": ["null", "string"], "name": "phone" }, { "default": null, "type": ["null", "string"], "name": "housenum" }, { "default": {}, "type": { "fields": [ { "default": "NONE", "type": "string", "name": "street" }, { "default": "NONE", "type": "string", "name": "city" }, { "default": "NONE", "type": "string", "name": "state_prov" }, { "default": "NONE", "type": "string", "name": "country" }, { "default": "NONE", "type": "string", "name": "zip" } ], "type": "record", "name": "mailing_address" }, "name": "address" } ] } "#; #[derive(Serialize, Clone)] struct MailingAddress { street: String, city: String, state_prov: String, country: String, zip: String, } #[derive(Serialize, Clone)] struct BigRecord { username: Option<String>, age: i32, phone: Option<String>, housenum: Option<String>, address: MailingAddress, } const RAW_ADDRESS_SCHEMA: &str = r#" { "fields": [ { "default": "NONE", "type": "string", "name": "street" }, { "default": "NONE", "type": "string", "name": "city" }, { "default": "NONE", "type": "string", "name": "state_prov" }, { "default": "NONE", "type": "string", "name": "country" }, { "default": "NONE", "type": "string", "name": "zip" } ], "type": "record", "name": "mailing_address" } "#; fn make_small_record() -> anyhow::Result<(Schema, Value)> { let small_schema = Schema::parse_str(RAW_SMALL_SCHEMA)?; let small_record = { let mut small_record = Record::new(&small_schema).unwrap(); small_record.put("field", "foo"); small_record.into() }; Ok((small_schema, small_record)) } fn make_small_record_ser() -> anyhow::Result<(Schema, SmallRecord)> { let small_schema = Schema::parse_str(RAW_SMALL_SCHEMA)?; let small_record = SmallRecord { field: String::from("foo"), }; Ok((small_schema, small_record)) } fn make_big_record() -> anyhow::Result<(Schema, Value)> { let big_schema = Schema::parse_str(RAW_BIG_SCHEMA)?; let address_schema = Schema::parse_str(RAW_ADDRESS_SCHEMA)?; let mut address = Record::new(&address_schema).unwrap(); address.put("street", "street"); address.put("city", "city"); address.put("state_prov", "state_prov"); address.put("country", "country"); address.put("zip", "zip"); let big_record = { let mut big_record = Record::new(&big_schema).unwrap(); big_record.put( "username", Value::Union(1, Box::new(Value::String("username".to_owned()))), ); big_record.put("age", 10i32); big_record.put( "phone", Value::Union(1, Box::new(Value::String("000000000".to_owned()))), ); big_record.put( "housenum", Value::Union(1, Box::new(Value::String("0000".to_owned()))), ); big_record.put("address", address); big_record.into() }; Ok((big_schema, big_record)) } fn make_big_record_ser() -> anyhow::Result<(Schema, BigRecord)> { let big_schema = Schema::parse_str(RAW_BIG_SCHEMA)?; let big_record = BigRecord { username: Some(String::from("username")), age: 10, phone: Some(String::from("000000000")), housenum: Some(String::from("0000")), address: MailingAddress { street: String::from("street"), city: String::from("city"), state_prov: String::from("state_prov"), country: String::from("country"), zip: String::from("zip"), }, }; Ok((big_schema, big_record)) } fn make_records(record: Value, count: usize) -> Vec<Value> { std::iter::repeat(record).take(count).collect() } fn make_records_ser<T: Serialize + Clone>(record: T, count: usize) -> Vec<T> { std::iter::repeat(record).take(count).collect() } fn write(schema: &Schema, records: &[Value]) -> AvroResult<Vec<u8>> { let mut writer = Writer::new(schema, Vec::new()); writer.extend_from_slice(records).unwrap(); writer.into_inner() } fn write_ser<T: Serialize>(schema: &Schema, records: &[T]) -> AvroResult<Vec<u8>> { let mut writer = Writer::new(schema, Vec::new()); writer.extend_ser(records)?; writer.into_inner() } fn read(schema: &Schema, bytes: &[u8]) -> anyhow::Result<()> { let reader = Reader::with_schema(schema, bytes)?; for record in reader { let _ = record?; } Ok(()) } fn read_schemaless(bytes: &[u8]) -> anyhow::Result<()> { let reader = Reader::new(bytes)?; for record in reader { let _ = record?; } Ok(()) } fn bench_write( c: &mut Criterion, make_record: impl Fn() -> anyhow::Result<(Schema, Value)>, n_records: usize, name: &str, ) -> anyhow::Result<()> { let (schema, record) = make_record()?; let records = make_records(record, n_records); c.bench_function(name, |b| b.iter(|| write(&schema, &records))); Ok(()) } fn bench_write_ser<T: Serialize + Clone>( c: &mut Criterion, make_record: impl Fn() -> anyhow::Result<(Schema, T)>, n_records: usize, name: &str, ) -> anyhow::Result<()> { let (schema, record) = make_record()?; let records = make_records_ser(record, n_records); c.bench_function(name, |b| b.iter(|| write_ser(&schema, &records))); Ok(()) } fn bench_read( c: &mut Criterion, make_record: impl Fn() -> anyhow::Result<(Schema, Value)>, n_records: usize, name: &str, ) -> anyhow::Result<()> { let (schema, record) = make_record()?; let records = make_records(record, n_records); let bytes = write(&schema, &records).unwrap(); c.bench_function(name, |b| b.iter(|| read(&schema, &bytes))); Ok(()) } fn bench_from_file(c: &mut Criterion, file_path: &str, name: &str) -> anyhow::Result<()> { let bytes = std::fs::read(file_path)?; c.bench_function(name, |b| b.iter(|| read_schemaless(&bytes))); Ok(()) } fn bench_small_schema_write_1_record(c: &mut Criterion) { bench_write(c, make_small_record, 1, "small schema, write 1 record").unwrap(); } fn bench_small_schema_write_1_record_ser(c: &mut Criterion) { bench_write_ser( c, make_small_record_ser, 1, "small schema, write 1 record (serde way)", ) .unwrap(); } fn bench_small_schema_write_100_record(c: &mut Criterion) { bench_write(c, make_small_record, 100, "small schema, write 100 records").unwrap(); } fn bench_small_schema_write_100_record_ser(c: &mut Criterion) { bench_write_ser( c, make_small_record_ser, 100, "small schema, write 100 records (serde way)", ) .unwrap(); } fn bench_small_schema_write_10_000_record(c: &mut Criterion) { bench_write( c, make_small_record, 10_000, "small schema, write 10k records", ) .unwrap(); } fn bench_small_schema_write_10_000_record_ser(c: &mut Criterion) { bench_write_ser( c, make_small_record_ser, 10_000, "small schema, write 10k records (serde way)", ) .unwrap() } fn bench_small_schema_read_1_record(c: &mut Criterion) { bench_read(c, make_small_record, 1, "small schema, read 1 record").unwrap(); } fn bench_small_schema_read_100_record(c: &mut Criterion) { bench_read(c, make_small_record, 100, "small schema, read 100 records").unwrap(); } fn bench_small_schema_read_10_000_record(c: &mut Criterion) { bench_read( c, make_small_record, 10_000, "small schema, read 10k records", ) .unwrap(); } fn bench_big_schema_write_1_record(c: &mut Criterion) { bench_write(c, make_big_record, 1, "big schema, write 1 record").unwrap(); } fn bench_big_schema_write_1_record_ser(c: &mut Criterion) { bench_write_ser( c, make_big_record_ser, 1, "big schema, write 1 record (serde way)", ) .unwrap(); } fn bench_big_schema_write_100_record(c: &mut Criterion) { bench_write(c, make_big_record, 100, "big schema, write 100 records").unwrap(); } fn bench_big_schema_write_100_record_ser(c: &mut Criterion) { bench_write_ser( c, make_big_record_ser, 100, "big schema, write 100 records (serde way)", ) .unwrap(); } fn bench_big_schema_write_10_000_record(c: &mut Criterion) { bench_write(c, make_big_record, 10_000, "big schema, write 10k records").unwrap(); } fn bench_big_schema_write_10_000_record_ser(c: &mut Criterion) { bench_write_ser( c, make_big_record_ser, 10_000, "big scheam, write 10k records (serde way)", ) .unwrap(); } fn bench_big_schema_read_1_record(c: &mut Criterion) { bench_read(c, make_big_record, 1, "big schema, read 1 record").unwrap(); } fn bench_big_schema_read_100_record(c: &mut Criterion) { bench_read(c, make_big_record, 100, "big schema, read 100 records").unwrap(); } fn bench_big_schema_read_10_000_record(c: &mut Criterion) { bench_read(c, make_big_record, 10_000, "big schema, read 10k records").unwrap(); } fn bench_big_schema_read_100_000_record(c: &mut Criterion) { bench_read(c, make_big_record, 100_000, "big schema, read 100k records").unwrap(); } // This benchmark reads from the `benches/quickstop-null.avro` file, which was pulled from // the `goavro` project benchmarks: // https://github.com/linkedin/goavro/blob/master/fixtures/quickstop-null.avro // This was done for the sake of comparing this crate against the `goavro` implementation. fn bench_file_quickstop_null(c: &mut Criterion) { bench_from_file(c, "benches/quickstop-null.avro", "quickstop null file").unwrap(); } criterion_group!( benches, bench_small_schema_write_1_record, bench_small_schema_write_100_record, bench_small_schema_read_1_record, bench_small_schema_read_100_record, bench_big_schema_write_1_record, bench_big_schema_write_100_record, bench_big_schema_read_1_record, bench_big_schema_read_100_record, ); criterion_group!( benches_ser, bench_small_schema_write_1_record_ser, bench_small_schema_write_100_record_ser, bench_big_schema_write_1_record_ser, bench_big_schema_write_100_record_ser, ); criterion_group!( name = long_benches; config = Criterion::default().sample_size(20).measurement_time(Duration::from_secs(10)); targets = bench_file_quickstop_null, bench_small_schema_write_10_000_record, bench_small_schema_read_10_000_record, bench_big_schema_read_10_000_record, bench_big_schema_write_10_000_record ); criterion_group!( name = long_benches_ser; config = Criterion::default().sample_size(20).measurement_time(Duration::from_secs(10)); targets = bench_small_schema_write_10_000_record_ser, bench_big_schema_write_10_000_record_ser ); criterion_group!( name = very_long_benches; config = Criterion::default().sample_size(10).measurement_time(Duration::from_secs(20)); targets = bench_big_schema_read_100_000_record, ); criterion_main!( benches, benches_ser, long_benches, long_benches_ser, very_long_benches );