arrow-avro/src/reader/mod.rs (181 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Read Avro data to Arrow
use crate::reader::block::{Block, BlockDecoder};
use crate::reader::header::{Header, HeaderDecoder};
use arrow_schema::ArrowError;
use std::io::BufRead;
mod header;
mod block;
mod cursor;
mod record;
mod vlq;
/// Read a [`Header`] from the provided [`BufRead`]
fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
let mut decoder = HeaderDecoder::default();
loop {
let buf = reader.fill_buf()?;
if buf.is_empty() {
break;
}
let read = buf.len();
let decoded = decoder.decode(buf)?;
reader.consume(decoded);
if decoded != read {
break;
}
}
decoder
.flush()
.ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
}
/// Return an iterator of [`Block`] from the provided [`BufRead`]
fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = Result<Block, ArrowError>> {
let mut decoder = BlockDecoder::default();
let mut try_next = move || {
loop {
let buf = reader.fill_buf()?;
if buf.is_empty() {
break;
}
let read = buf.len();
let decoded = decoder.decode(buf)?;
reader.consume(decoded);
if decoded != read {
break;
}
}
Ok(decoder.flush())
};
std::iter::from_fn(move || try_next().transpose())
}
#[cfg(test)]
mod test {
use crate::codec::AvroField;
use crate::compression::CompressionCodec;
use crate::reader::record::RecordDecoder;
use crate::reader::{read_blocks, read_header};
use crate::test_util::arrow_test_data;
use arrow_array::*;
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;
fn read_file(file: &str, batch_size: usize) -> RecordBatch {
let file = File::open(file).unwrap();
let mut reader = BufReader::new(file);
let header = read_header(&mut reader).unwrap();
let compression = header.compression().unwrap();
let schema = header.schema().unwrap().unwrap();
let root = AvroField::try_from(&schema).unwrap();
let mut decoder = RecordDecoder::try_new(root.data_type()).unwrap();
for result in read_blocks(reader) {
let block = result.unwrap();
assert_eq!(block.sync, header.sync());
if let Some(c) = compression {
let decompressed = c.decompress(&block.data).unwrap();
let mut offset = 0;
let mut remaining = block.count;
while remaining > 0 {
let to_read = remaining.max(batch_size);
offset += decoder
.decode(&decompressed[offset..], block.count)
.unwrap();
remaining -= to_read;
}
assert_eq!(offset, decompressed.len());
}
}
decoder.flush().unwrap()
}
#[test]
fn test_alltypes() {
let files = [
"avro/alltypes_plain.avro",
"avro/alltypes_plain.snappy.avro",
"avro/alltypes_plain.zstandard.avro",
];
let expected = RecordBatch::try_from_iter_with_nullable([
(
"id",
Arc::new(Int32Array::from(vec![4, 5, 6, 7, 2, 3, 0, 1])) as _,
true,
),
(
"bool_col",
Arc::new(BooleanArray::from_iter((0..8).map(|x| Some(x % 2 == 0)))) as _,
true,
),
(
"tinyint_col",
Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
true,
),
(
"smallint_col",
Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
true,
),
(
"int_col",
Arc::new(Int32Array::from_iter_values((0..8).map(|x| x % 2))) as _,
true,
),
(
"bigint_col",
Arc::new(Int64Array::from_iter_values((0..8).map(|x| (x % 2) * 10))) as _,
true,
),
(
"float_col",
Arc::new(Float32Array::from_iter_values(
(0..8).map(|x| (x % 2) as f32 * 1.1),
)) as _,
true,
),
(
"double_col",
Arc::new(Float64Array::from_iter_values(
(0..8).map(|x| (x % 2) as f64 * 10.1),
)) as _,
true,
),
(
"date_string_col",
Arc::new(BinaryArray::from_iter_values([
[48, 51, 47, 48, 49, 47, 48, 57],
[48, 51, 47, 48, 49, 47, 48, 57],
[48, 52, 47, 48, 49, 47, 48, 57],
[48, 52, 47, 48, 49, 47, 48, 57],
[48, 50, 47, 48, 49, 47, 48, 57],
[48, 50, 47, 48, 49, 47, 48, 57],
[48, 49, 47, 48, 49, 47, 48, 57],
[48, 49, 47, 48, 49, 47, 48, 57],
])) as _,
true,
),
(
"string_col",
Arc::new(BinaryArray::from_iter_values((0..8).map(|x| [48 + x % 2]))) as _,
true,
),
(
"timestamp_col",
Arc::new(
TimestampMicrosecondArray::from_iter_values([
1235865600000000, // 2009-03-01T00:00:00.000
1235865660000000, // 2009-03-01T00:01:00.000
1238544000000000, // 2009-04-01T00:00:00.000
1238544060000000, // 2009-04-01T00:01:00.000
1233446400000000, // 2009-02-01T00:00:00.000
1233446460000000, // 2009-02-01T00:01:00.000
1230768000000000, // 2009-01-01T00:00:00.000
1230768060000000, // 2009-01-01T00:01:00.000
])
.with_timezone("+00:00"),
) as _,
true,
),
])
.unwrap();
for file in files {
let file = arrow_test_data(file);
assert_eq!(read_file(&file, 8), expected);
assert_eq!(read_file(&file, 3), expected);
}
}
}