avro/src/util.rs (206 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::{schema::Documentation, AvroResult, Error};
use serde_json::{Map, Value};
use std::{
io::{Read, Write},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Once,
},
};
/// Maximum number of bytes that can be allocated when decoding
/// Avro-encoded values. This is a protection against ill-formed
/// data, whose length field might be interpreted as enormous.
/// See max_allocation_bytes to change this limit.
pub const DEFAULT_MAX_ALLOCATION_BYTES: usize = 512 * 1024 * 1024;
static MAX_ALLOCATION_BYTES: AtomicUsize = AtomicUsize::new(DEFAULT_MAX_ALLOCATION_BYTES);
static MAX_ALLOCATION_BYTES_ONCE: Once = Once::new();
/// Whether to set serialization & deserialization traits
/// as `human_readable` or not.
/// See [set_serde_human_readable] to change this value.
// crate-visible for testing
pub(crate) static SERDE_HUMAN_READABLE: AtomicBool = AtomicBool::new(true);
static SERDE_HUMAN_READABLE_ONCE: Once = Once::new();
pub trait MapHelper {
fn string(&self, key: &str) -> Option<String>;
fn name(&self) -> Option<String> {
self.string("name")
}
fn doc(&self) -> Documentation {
self.string("doc")
}
fn aliases(&self) -> Option<Vec<String>>;
}
impl MapHelper for Map<String, Value> {
fn string(&self, key: &str) -> Option<String> {
self.get(key)
.and_then(|v| v.as_str())
.map(|v| v.to_string())
}
fn aliases(&self) -> Option<Vec<String>> {
// FIXME no warning when aliases aren't a json array of json strings
self.get("aliases")
.and_then(|aliases| aliases.as_array())
.and_then(|aliases| {
aliases
.iter()
.map(|alias| alias.as_str())
.map(|alias| alias.map(|a| a.to_string()))
.collect::<Option<_>>()
})
}
}
pub fn read_long<R: Read>(reader: &mut R) -> AvroResult<i64> {
zag_i64(reader)
}
pub fn zig_i32<W: Write>(n: i32, buffer: W) -> AvroResult<usize> {
zig_i64(n as i64, buffer)
}
pub fn zig_i64<W: Write>(n: i64, writer: W) -> AvroResult<usize> {
encode_variable(((n << 1) ^ (n >> 63)) as u64, writer)
}
pub fn zag_i32<R: Read>(reader: &mut R) -> AvroResult<i32> {
let i = zag_i64(reader)?;
i32::try_from(i).map_err(|e| Error::ZagI32(e, i))
}
pub fn zag_i64<R: Read>(reader: &mut R) -> AvroResult<i64> {
let z = decode_variable(reader)?;
Ok(if z & 0x1 == 0 {
(z >> 1) as i64
} else {
!(z >> 1) as i64
})
}
fn encode_variable<W: Write>(mut z: u64, mut writer: W) -> AvroResult<usize> {
let mut buffer = [0u8; 10];
let mut i: usize = 0;
loop {
if z <= 0x7F {
buffer[i] = (z & 0x7F) as u8;
i += 1;
break;
} else {
buffer[i] = (0x80 | (z & 0x7F)) as u8;
i += 1;
z >>= 7;
}
}
writer.write(&buffer[..i]).map_err(Error::WriteBytes)
}
fn decode_variable<R: Read>(reader: &mut R) -> AvroResult<u64> {
let mut i = 0u64;
let mut buf = [0u8; 1];
let mut j = 0;
loop {
if j > 9 {
// if j * 7 > 64
return Err(Error::IntegerOverflow);
}
reader
.read_exact(&mut buf[..])
.map_err(Error::ReadVariableIntegerBytes)?;
i |= (u64::from(buf[0] & 0x7F)) << (j * 7);
if (buf[0] >> 7) == 0 {
break;
} else {
j += 1;
}
}
Ok(i)
}
/// Set a new maximum number of bytes that can be allocated when decoding data.
/// Once called, the limit cannot be changed.
///
/// **NOTE** This function must be called before decoding **any** data. The
/// library leverages [`std::sync::Once`](https://doc.rust-lang.org/std/sync/struct.Once.html)
/// to set the limit either when calling this method, or when decoding for
/// the first time.
pub fn max_allocation_bytes(num_bytes: usize) -> usize {
MAX_ALLOCATION_BYTES_ONCE.call_once(|| {
MAX_ALLOCATION_BYTES.store(num_bytes, Ordering::Release);
});
MAX_ALLOCATION_BYTES.load(Ordering::Acquire)
}
pub fn safe_len(len: usize) -> AvroResult<usize> {
let max_bytes = max_allocation_bytes(DEFAULT_MAX_ALLOCATION_BYTES);
if len <= max_bytes {
Ok(len)
} else {
Err(Error::MemoryAllocation {
desired: len,
maximum: max_bytes,
})
}
}
/// Set whether serializing/deserializing is marked as human readable in serde traits.
/// This will adjust the return value of `is_human_readable()` for both.
/// Once called, the value cannot be changed.
///
/// **NOTE** This function must be called before serializing/deserializing **any** data. The
/// library leverages [`std::sync::Once`](https://doc.rust-lang.org/std/sync/struct.Once.html)
/// to set the limit either when calling this method, or when decoding for
/// the first time.
pub fn set_serde_human_readable(human_readable: bool) {
SERDE_HUMAN_READABLE_ONCE.call_once(|| {
SERDE_HUMAN_READABLE.store(human_readable, Ordering::Release);
});
}
pub(crate) fn is_human_readable() -> bool {
SERDE_HUMAN_READABLE.load(Ordering::Acquire)
}
#[cfg(test)]
mod tests {
use super::*;
use apache_avro_test_helper::TestResult;
use pretty_assertions::assert_eq;
#[test]
fn test_zigzag() {
let mut a = Vec::new();
let mut b = Vec::new();
zig_i32(42i32, &mut a).unwrap();
zig_i64(42i64, &mut b).unwrap();
assert_eq!(a, b);
}
#[test]
fn test_zig_i64() {
let mut s = Vec::new();
zig_i64(0, &mut s).unwrap();
assert_eq!(s, [0]);
s.clear();
zig_i64(-1, &mut s).unwrap();
assert_eq!(s, [1]);
s.clear();
zig_i64(1, &mut s).unwrap();
assert_eq!(s, [2]);
s.clear();
zig_i64(-64, &mut s).unwrap();
assert_eq!(s, [127]);
s.clear();
zig_i64(64, &mut s).unwrap();
assert_eq!(s, [128, 1]);
s.clear();
zig_i64(i32::MAX as i64, &mut s).unwrap();
assert_eq!(s, [254, 255, 255, 255, 15]);
s.clear();
zig_i64(i32::MAX as i64 + 1, &mut s).unwrap();
assert_eq!(s, [128, 128, 128, 128, 16]);
s.clear();
zig_i64(i32::MIN as i64, &mut s).unwrap();
assert_eq!(s, [255, 255, 255, 255, 15]);
s.clear();
zig_i64(i32::MIN as i64 - 1, &mut s).unwrap();
assert_eq!(s, [129, 128, 128, 128, 16]);
s.clear();
zig_i64(i64::MAX, &mut s).unwrap();
assert_eq!(s, [254, 255, 255, 255, 255, 255, 255, 255, 255, 1]);
s.clear();
zig_i64(i64::MIN, &mut s).unwrap();
assert_eq!(s, [255, 255, 255, 255, 255, 255, 255, 255, 255, 1]);
}
#[test]
fn test_zig_i32() {
let mut s = Vec::new();
zig_i32(i32::MAX / 2, &mut s).unwrap();
assert_eq!(s, [254, 255, 255, 255, 7]);
s.clear();
zig_i32(i32::MIN / 2, &mut s).unwrap();
assert_eq!(s, [255, 255, 255, 255, 7]);
s.clear();
zig_i32(-(i32::MIN / 2), &mut s).unwrap();
assert_eq!(s, [128, 128, 128, 128, 8]);
s.clear();
zig_i32(i32::MIN / 2 - 1, &mut s).unwrap();
assert_eq!(s, [129, 128, 128, 128, 8]);
s.clear();
zig_i32(i32::MAX, &mut s).unwrap();
assert_eq!(s, [254, 255, 255, 255, 15]);
s.clear();
zig_i32(i32::MIN, &mut s).unwrap();
assert_eq!(s, [255, 255, 255, 255, 15]);
}
#[test]
fn test_overflow() {
let causes_left_shift_overflow: &[u8] = &[0xe1, 0xe1, 0xe1, 0xe1, 0xe1];
assert!(decode_variable(&mut &*causes_left_shift_overflow).is_err());
}
#[test]
fn test_safe_len() -> TestResult {
assert_eq!(42usize, safe_len(42usize)?);
assert!(safe_len(1024 * 1024 * 1024).is_err());
Ok(())
}
}