arrow-csv/src/reader/records.rs (282 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. use arrow_schema::ArrowError; use csv_core::{ReadRecordResult, Reader}; /// The estimated length of a field in bytes const AVERAGE_FIELD_SIZE: usize = 8; /// The minimum amount of data in a single read const MIN_CAPACITY: usize = 1024; /// [`RecordDecoder`] provides a push-based interface to decoder [`StringRecords`] #[derive(Debug)] pub struct RecordDecoder { delimiter: Reader, /// The expected number of fields per row num_columns: usize, /// The current line number line_number: usize, /// Offsets delimiting field start positions offsets: Vec<usize>, /// The current offset into `self.offsets` /// /// We track this independently of Vec to avoid re-zeroing memory offsets_len: usize, /// The number of fields read for the current record current_field: usize, /// The number of rows buffered num_rows: usize, /// Decoded field data data: Vec<u8>, /// Offsets into data /// /// We track this independently of Vec to avoid re-zeroing memory data_len: usize, /// Whether rows with less than expected columns are considered valid /// /// Default value is false /// When enabled fills in missing columns with null truncated_rows: bool, } impl RecordDecoder { pub fn new(delimiter: Reader, num_columns: usize, truncated_rows: bool) -> Self { Self { delimiter, num_columns, line_number: 1, offsets: vec![], offsets_len: 1, // The first offset is always 0 current_field: 0, data_len: 0, data: vec![], num_rows: 0, truncated_rows, } } /// Decodes records from `input` returning the number of records and bytes read /// /// Note: this expects to be called with an empty `input` to signal EOF pub fn decode(&mut self, input: &[u8], to_read: usize) -> Result<(usize, usize), ArrowError> { if to_read == 0 { return Ok((0, 0)); } // Reserve sufficient capacity in offsets self.offsets .resize(self.offsets_len + to_read * self.num_columns, 0); // The current offset into `input` let mut input_offset = 0; // The number of rows decoded in this pass let mut read = 0; loop { // Reserve necessary space in output data based on best estimate let remaining_rows = to_read - read; let capacity = remaining_rows * self.num_columns * AVERAGE_FIELD_SIZE; let estimated_data = capacity.max(MIN_CAPACITY); self.data.resize(self.data_len + estimated_data, 0); // Try to read a record loop { let (result, bytes_read, bytes_written, end_positions) = self.delimiter.read_record( &input[input_offset..], &mut self.data[self.data_len..], &mut self.offsets[self.offsets_len..], ); self.current_field += end_positions; self.offsets_len += end_positions; input_offset += bytes_read; self.data_len += bytes_written; match result { ReadRecordResult::End | ReadRecordResult::InputEmpty => { // Reached end of input return Ok((read, input_offset)); } // Need to allocate more capacity ReadRecordResult::OutputFull => break, ReadRecordResult::OutputEndsFull => { return Err(ArrowError::CsvError(format!( "incorrect number of fields for line {}, expected {} got more than {}", self.line_number, self.num_columns, self.current_field ))); } ReadRecordResult::Record => { if self.current_field != self.num_columns { if self.truncated_rows && self.current_field < self.num_columns { // If the number of fields is less than expected, pad with nulls let fill_count = self.num_columns - self.current_field; let fill_value = self.offsets[self.offsets_len - 1]; self.offsets[self.offsets_len..self.offsets_len + fill_count] .fill(fill_value); self.offsets_len += fill_count; } else { return Err(ArrowError::CsvError(format!( "incorrect number of fields for line {}, expected {} got {}", self.line_number, self.num_columns, self.current_field ))); } } read += 1; self.current_field = 0; self.line_number += 1; self.num_rows += 1; if read == to_read { // Read sufficient rows return Ok((read, input_offset)); } if input.len() == input_offset { // Input exhausted, need to read more // Without this read_record will interpret the empty input // byte array as indicating the end of the file return Ok((read, input_offset)); } } } } } } /// Returns the current number of buffered records pub fn len(&self) -> usize { self.num_rows } /// Returns true if the decoder is empty pub fn is_empty(&self) -> bool { self.num_rows == 0 } /// Clears the current contents of the decoder pub fn clear(&mut self) { // This does not reset current_field to allow clearing part way through a record self.offsets_len = 1; self.data_len = 0; self.num_rows = 0; } /// Flushes the current contents of the reader pub fn flush(&mut self) -> Result<StringRecords<'_>, ArrowError> { if self.current_field != 0 { return Err(ArrowError::CsvError( "Cannot flush part way through record".to_string(), )); } // csv_core::Reader writes end offsets relative to the start of the row // Therefore scan through and offset these based on the cumulative row offsets let mut row_offset = 0; self.offsets[1..self.offsets_len] .chunks_exact_mut(self.num_columns) .for_each(|row| { let offset = row_offset; row.iter_mut().for_each(|x| { *x += offset; row_offset = *x; }); }); // Need to truncate data t1o the actual amount of data read let data = std::str::from_utf8(&self.data[..self.data_len]).map_err(|e| { let valid_up_to = e.valid_up_to(); // We can't use binary search because of empty fields let idx = self.offsets[..self.offsets_len] .iter() .rposition(|x| *x <= valid_up_to) .unwrap(); let field = idx % self.num_columns + 1; let line_offset = self.line_number - self.num_rows; let line = line_offset + idx / self.num_columns; ArrowError::CsvError(format!( "Encountered invalid UTF-8 data for line {line} and field {field}" )) })?; let offsets = &self.offsets[..self.offsets_len]; let num_rows = self.num_rows; // Reset state self.offsets_len = 1; self.data_len = 0; self.num_rows = 0; Ok(StringRecords { num_rows, num_columns: self.num_columns, offsets, data, }) } } /// A collection of parsed, UTF-8 CSV records #[derive(Debug)] pub struct StringRecords<'a> { num_columns: usize, num_rows: usize, offsets: &'a [usize], data: &'a str, } impl<'a> StringRecords<'a> { fn get(&self, index: usize) -> StringRecord<'a> { let field_idx = index * self.num_columns; StringRecord { data: self.data, offsets: &self.offsets[field_idx..field_idx + self.num_columns + 1], } } pub fn len(&self) -> usize { self.num_rows } pub fn iter(&self) -> impl Iterator<Item = StringRecord<'a>> + '_ { (0..self.num_rows).map(|x| self.get(x)) } } /// A single parsed, UTF-8 CSV record #[derive(Debug, Clone, Copy)] pub struct StringRecord<'a> { data: &'a str, offsets: &'a [usize], } impl<'a> StringRecord<'a> { pub fn get(&self, index: usize) -> &'a str { let end = self.offsets[index + 1]; let start = self.offsets[index]; // SAFETY: // Parsing produces offsets at valid byte boundaries unsafe { self.data.get_unchecked(start..end) } } } impl std::fmt::Display for StringRecord<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let num_fields = self.offsets.len() - 1; write!(f, "[")?; for i in 0..num_fields { if i > 0 { write!(f, ",")?; } write!(f, "{}", self.get(i))?; } write!(f, "]")?; Ok(()) } } #[cfg(test)] mod tests { use crate::reader::records::RecordDecoder; use csv_core::Reader; use std::io::{BufRead, BufReader, Cursor}; #[test] fn test_basic() { let csv = [ "foo,bar,baz", "a,b,c", "12,3,5", "\"asda\"\"asas\",\"sdffsnsd\", as", ] .join("\n"); let mut expected = vec![ vec!["foo", "bar", "baz"], vec!["a", "b", "c"], vec!["12", "3", "5"], vec!["asda\"asas", "sdffsnsd", " as"], ] .into_iter(); let mut reader = BufReader::with_capacity(3, Cursor::new(csv.as_bytes())); let mut decoder = RecordDecoder::new(Reader::new(), 3, false); loop { let to_read = 3; let mut read = 0; loop { let buf = reader.fill_buf().unwrap(); let (records, bytes) = decoder.decode(buf, to_read - read).unwrap(); reader.consume(bytes); read += records; if read == to_read || bytes == 0 { break; } } if read == 0 { break; } let b = decoder.flush().unwrap(); b.iter().zip(&mut expected).for_each(|(record, expected)| { let actual = (0..3) .map(|field_idx| record.get(field_idx)) .collect::<Vec<_>>(); assert_eq!(actual, expected) }); } assert!(expected.next().is_none()); } #[test] fn test_invalid_fields() { let csv = "a,b\nb,c\na\n"; let mut decoder = RecordDecoder::new(Reader::new(), 2, false); let err = decoder.decode(csv.as_bytes(), 4).unwrap_err().to_string(); let expected = "Csv error: incorrect number of fields for line 3, expected 2 got 1"; assert_eq!(err, expected); // Test with initial skip let mut decoder = RecordDecoder::new(Reader::new(), 2, false); let (skipped, bytes) = decoder.decode(csv.as_bytes(), 1).unwrap(); assert_eq!(skipped, 1); decoder.clear(); let remaining = &csv.as_bytes()[bytes..]; let err = decoder.decode(remaining, 3).unwrap_err().to_string(); assert_eq!(err, expected); } #[test] fn test_skip_insufficient_rows() { let csv = "a\nv\n"; let mut decoder = RecordDecoder::new(Reader::new(), 1, false); let (read, bytes) = decoder.decode(csv.as_bytes(), 3).unwrap(); assert_eq!(read, 2); assert_eq!(bytes, csv.len()); } #[test] fn test_truncated_rows() { let csv = "a,b\nv\n,1\n,2\n,3\n"; let mut decoder = RecordDecoder::new(Reader::new(), 2, true); let (read, bytes) = decoder.decode(csv.as_bytes(), 5).unwrap(); assert_eq!(read, 5); assert_eq!(bytes, csv.len()); } }