arrow-csv/src/reader/records.rs (282 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow_schema::ArrowError;
use csv_core::{ReadRecordResult, Reader};
/// The estimated length of a field in bytes
const AVERAGE_FIELD_SIZE: usize = 8;
/// The minimum amount of data in a single read
const MIN_CAPACITY: usize = 1024;
/// [`RecordDecoder`] provides a push-based interface to decoder [`StringRecords`]
#[derive(Debug)]
pub struct RecordDecoder {
delimiter: Reader,
/// The expected number of fields per row
num_columns: usize,
/// The current line number
line_number: usize,
/// Offsets delimiting field start positions
offsets: Vec<usize>,
/// The current offset into `self.offsets`
///
/// We track this independently of Vec to avoid re-zeroing memory
offsets_len: usize,
/// The number of fields read for the current record
current_field: usize,
/// The number of rows buffered
num_rows: usize,
/// Decoded field data
data: Vec<u8>,
/// Offsets into data
///
/// We track this independently of Vec to avoid re-zeroing memory
data_len: usize,
/// Whether rows with less than expected columns are considered valid
///
/// Default value is false
/// When enabled fills in missing columns with null
truncated_rows: bool,
}
impl RecordDecoder {
pub fn new(delimiter: Reader, num_columns: usize, truncated_rows: bool) -> Self {
Self {
delimiter,
num_columns,
line_number: 1,
offsets: vec![],
offsets_len: 1, // The first offset is always 0
current_field: 0,
data_len: 0,
data: vec![],
num_rows: 0,
truncated_rows,
}
}
/// Decodes records from `input` returning the number of records and bytes read
///
/// Note: this expects to be called with an empty `input` to signal EOF
pub fn decode(&mut self, input: &[u8], to_read: usize) -> Result<(usize, usize), ArrowError> {
if to_read == 0 {
return Ok((0, 0));
}
// Reserve sufficient capacity in offsets
self.offsets
.resize(self.offsets_len + to_read * self.num_columns, 0);
// The current offset into `input`
let mut input_offset = 0;
// The number of rows decoded in this pass
let mut read = 0;
loop {
// Reserve necessary space in output data based on best estimate
let remaining_rows = to_read - read;
let capacity = remaining_rows * self.num_columns * AVERAGE_FIELD_SIZE;
let estimated_data = capacity.max(MIN_CAPACITY);
self.data.resize(self.data_len + estimated_data, 0);
// Try to read a record
loop {
let (result, bytes_read, bytes_written, end_positions) =
self.delimiter.read_record(
&input[input_offset..],
&mut self.data[self.data_len..],
&mut self.offsets[self.offsets_len..],
);
self.current_field += end_positions;
self.offsets_len += end_positions;
input_offset += bytes_read;
self.data_len += bytes_written;
match result {
ReadRecordResult::End | ReadRecordResult::InputEmpty => {
// Reached end of input
return Ok((read, input_offset));
}
// Need to allocate more capacity
ReadRecordResult::OutputFull => break,
ReadRecordResult::OutputEndsFull => {
return Err(ArrowError::CsvError(format!(
"incorrect number of fields for line {}, expected {} got more than {}",
self.line_number, self.num_columns, self.current_field
)));
}
ReadRecordResult::Record => {
if self.current_field != self.num_columns {
if self.truncated_rows && self.current_field < self.num_columns {
// If the number of fields is less than expected, pad with nulls
let fill_count = self.num_columns - self.current_field;
let fill_value = self.offsets[self.offsets_len - 1];
self.offsets[self.offsets_len..self.offsets_len + fill_count]
.fill(fill_value);
self.offsets_len += fill_count;
} else {
return Err(ArrowError::CsvError(format!(
"incorrect number of fields for line {}, expected {} got {}",
self.line_number, self.num_columns, self.current_field
)));
}
}
read += 1;
self.current_field = 0;
self.line_number += 1;
self.num_rows += 1;
if read == to_read {
// Read sufficient rows
return Ok((read, input_offset));
}
if input.len() == input_offset {
// Input exhausted, need to read more
// Without this read_record will interpret the empty input
// byte array as indicating the end of the file
return Ok((read, input_offset));
}
}
}
}
}
}
/// Returns the current number of buffered records
pub fn len(&self) -> usize {
self.num_rows
}
/// Returns true if the decoder is empty
pub fn is_empty(&self) -> bool {
self.num_rows == 0
}
/// Clears the current contents of the decoder
pub fn clear(&mut self) {
// This does not reset current_field to allow clearing part way through a record
self.offsets_len = 1;
self.data_len = 0;
self.num_rows = 0;
}
/// Flushes the current contents of the reader
pub fn flush(&mut self) -> Result<StringRecords<'_>, ArrowError> {
if self.current_field != 0 {
return Err(ArrowError::CsvError(
"Cannot flush part way through record".to_string(),
));
}
// csv_core::Reader writes end offsets relative to the start of the row
// Therefore scan through and offset these based on the cumulative row offsets
let mut row_offset = 0;
self.offsets[1..self.offsets_len]
.chunks_exact_mut(self.num_columns)
.for_each(|row| {
let offset = row_offset;
row.iter_mut().for_each(|x| {
*x += offset;
row_offset = *x;
});
});
// Need to truncate data t1o the actual amount of data read
let data = std::str::from_utf8(&self.data[..self.data_len]).map_err(|e| {
let valid_up_to = e.valid_up_to();
// We can't use binary search because of empty fields
let idx = self.offsets[..self.offsets_len]
.iter()
.rposition(|x| *x <= valid_up_to)
.unwrap();
let field = idx % self.num_columns + 1;
let line_offset = self.line_number - self.num_rows;
let line = line_offset + idx / self.num_columns;
ArrowError::CsvError(format!(
"Encountered invalid UTF-8 data for line {line} and field {field}"
))
})?;
let offsets = &self.offsets[..self.offsets_len];
let num_rows = self.num_rows;
// Reset state
self.offsets_len = 1;
self.data_len = 0;
self.num_rows = 0;
Ok(StringRecords {
num_rows,
num_columns: self.num_columns,
offsets,
data,
})
}
}
/// A collection of parsed, UTF-8 CSV records
#[derive(Debug)]
pub struct StringRecords<'a> {
num_columns: usize,
num_rows: usize,
offsets: &'a [usize],
data: &'a str,
}
impl<'a> StringRecords<'a> {
fn get(&self, index: usize) -> StringRecord<'a> {
let field_idx = index * self.num_columns;
StringRecord {
data: self.data,
offsets: &self.offsets[field_idx..field_idx + self.num_columns + 1],
}
}
pub fn len(&self) -> usize {
self.num_rows
}
pub fn iter(&self) -> impl Iterator<Item = StringRecord<'a>> + '_ {
(0..self.num_rows).map(|x| self.get(x))
}
}
/// A single parsed, UTF-8 CSV record
#[derive(Debug, Clone, Copy)]
pub struct StringRecord<'a> {
data: &'a str,
offsets: &'a [usize],
}
impl<'a> StringRecord<'a> {
pub fn get(&self, index: usize) -> &'a str {
let end = self.offsets[index + 1];
let start = self.offsets[index];
// SAFETY:
// Parsing produces offsets at valid byte boundaries
unsafe { self.data.get_unchecked(start..end) }
}
}
impl std::fmt::Display for StringRecord<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let num_fields = self.offsets.len() - 1;
write!(f, "[")?;
for i in 0..num_fields {
if i > 0 {
write!(f, ",")?;
}
write!(f, "{}", self.get(i))?;
}
write!(f, "]")?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::reader::records::RecordDecoder;
use csv_core::Reader;
use std::io::{BufRead, BufReader, Cursor};
#[test]
fn test_basic() {
let csv = [
"foo,bar,baz",
"a,b,c",
"12,3,5",
"\"asda\"\"asas\",\"sdffsnsd\", as",
]
.join("\n");
let mut expected = vec![
vec!["foo", "bar", "baz"],
vec!["a", "b", "c"],
vec!["12", "3", "5"],
vec!["asda\"asas", "sdffsnsd", " as"],
]
.into_iter();
let mut reader = BufReader::with_capacity(3, Cursor::new(csv.as_bytes()));
let mut decoder = RecordDecoder::new(Reader::new(), 3, false);
loop {
let to_read = 3;
let mut read = 0;
loop {
let buf = reader.fill_buf().unwrap();
let (records, bytes) = decoder.decode(buf, to_read - read).unwrap();
reader.consume(bytes);
read += records;
if read == to_read || bytes == 0 {
break;
}
}
if read == 0 {
break;
}
let b = decoder.flush().unwrap();
b.iter().zip(&mut expected).for_each(|(record, expected)| {
let actual = (0..3)
.map(|field_idx| record.get(field_idx))
.collect::<Vec<_>>();
assert_eq!(actual, expected)
});
}
assert!(expected.next().is_none());
}
#[test]
fn test_invalid_fields() {
let csv = "a,b\nb,c\na\n";
let mut decoder = RecordDecoder::new(Reader::new(), 2, false);
let err = decoder.decode(csv.as_bytes(), 4).unwrap_err().to_string();
let expected = "Csv error: incorrect number of fields for line 3, expected 2 got 1";
assert_eq!(err, expected);
// Test with initial skip
let mut decoder = RecordDecoder::new(Reader::new(), 2, false);
let (skipped, bytes) = decoder.decode(csv.as_bytes(), 1).unwrap();
assert_eq!(skipped, 1);
decoder.clear();
let remaining = &csv.as_bytes()[bytes..];
let err = decoder.decode(remaining, 3).unwrap_err().to_string();
assert_eq!(err, expected);
}
#[test]
fn test_skip_insufficient_rows() {
let csv = "a\nv\n";
let mut decoder = RecordDecoder::new(Reader::new(), 1, false);
let (read, bytes) = decoder.decode(csv.as_bytes(), 3).unwrap();
assert_eq!(read, 2);
assert_eq!(bytes, csv.len());
}
#[test]
fn test_truncated_rows() {
let csv = "a,b\nv\n,1\n,2\n,3\n";
let mut decoder = RecordDecoder::new(Reader::new(), 2, true);
let (read, bytes) = decoder.decode(csv.as_bytes(), 5).unwrap();
assert_eq!(read, 5);
assert_eq!(bytes, csv.len());
}
}