in datafusion/core/src/datasource/physical_plan/csv.rs [449:589]
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
// `self.config.has_header` controls whether to skip reading the 1st line header
// If the .csv file is read in parallel and this `CsvOpener` is only reading some middle
// partition, then don't skip first line
let mut csv_has_header = self.config.has_header;
if let Some(FileRange { start, .. }) = file_meta.range {
if start != 0 {
csv_has_header = false;
}
}
let config = CsvConfig {
has_header: csv_has_header,
..(*self.config).clone()
};
let file_compression_type = self.file_compression_type.to_owned();
if file_meta.range.is_some() {
assert!(
!file_compression_type.is_compressed(),
"Reading compressed .csv in parallel is not supported"
);
}
Ok(Box::pin(async move {
let file_size = file_meta.object_meta.size;
// Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries)
let (start_byte, end_byte) = match file_meta.range {
None => (0, file_size),
Some(FileRange { start, end }) => {
let (start, end) = (start as usize, end as usize);
// Partition byte range is [start, end), the boundary might be in the middle of
// some line. Need to find out the exact line boundaries.
let start_delta = if start != 0 {
find_first_newline(
&config.object_store,
file_meta.location(),
start - 1,
file_size,
)
.await?
} else {
0
};
let end_delta = if end != file_size {
find_first_newline(
&config.object_store,
file_meta.location(),
end - 1,
file_size,
)
.await?
} else {
0
};
(start + start_delta, end + end_delta)
}
};
// For special case: If `Range` has equal `start` and `end`, object store will fetch
// the whole file
let localfs: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
let is_localfs = localfs.type_id() == config.object_store.type_id();
if start_byte == end_byte && !is_localfs {
return Ok(futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed());
}
let options = GetOptions {
range: Some(Range {
start: start_byte,
end: end_byte,
}),
..Default::default()
};
match config
.object_store
.get_opts(file_meta.location(), options)
.await?
{
GetResult::File(file, _) => {
let is_whole_file_scanned = file_meta.range.is_none();
let decoder = if is_whole_file_scanned {
// For special case: `get_range()` will interpret `start` and `end` as the
// byte range after decompression for compressed files
file_compression_type.convert_read(file)?
} else {
// Range currently is ignored for GetResult::File(...)
let bytes = Cursor::new(
config
.object_store
.get_range(
file_meta.location(),
Range {
start: start_byte,
end: end_byte,
},
)
.await?,
);
file_compression_type.convert_read(bytes)?
};
Ok(futures::stream::iter(config.open(decoder)?).boxed())
}
GetResult::Stream(s) => {
let mut decoder = config.builder().build_decoder();
let s = s.map_err(DataFusionError::from);
let mut input =
file_compression_type.convert_stream(s.boxed())?.fuse();
let mut buffered = Bytes::new();
let s = futures::stream::poll_fn(move |cx| {
loop {
if buffered.is_empty() {
match ready!(input.poll_next_unpin(cx)) {
Some(Ok(b)) => buffered = b,
Some(Err(e)) => {
return Poll::Ready(Some(Err(e.into())))
}
None => {}
};
}
let decoded = match decoder.decode(buffered.as_ref()) {
// Note: the decoder needs to be called with an empty
// array to delimt the final record
Ok(0) => break,
Ok(decoded) => decoded,
Err(e) => return Poll::Ready(Some(Err(e))),
};
buffered.advance(decoded);
}
Poll::Ready(decoder.flush().transpose())
});
Ok(s.boxed())
}
}
}))
}