in datafusion/core/src/datasource/physical_plan/file_stream.rs [310:496]
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
loop {
match &mut self.state {
FileStreamState::Idle => {
self.file_stream_metrics.time_opening.start();
match self.start_next_file().transpose() {
Ok(Some((future, partition_values))) => {
self.state = FileStreamState::Open {
future,
partition_values,
}
}
Ok(None) => return Poll::Ready(None),
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
}
}
FileStreamState::Open {
future,
partition_values,
} => match ready!(future.poll_unpin(cx)) {
Ok(reader) => {
let partition_values = mem::take(partition_values);
// include time needed to start opening in `start_next_file`
self.file_stream_metrics.time_opening.stop();
let next = self.start_next_file().transpose();
self.file_stream_metrics.time_scanning_until_data.start();
self.file_stream_metrics.time_scanning_total.start();
match next {
Ok(Some((next_future, next_partition_values))) => {
self.state = FileStreamState::Scan {
partition_values,
reader,
next: Some((
NextOpen::Pending(next_future),
next_partition_values,
)),
};
}
Ok(None) => {
self.state = FileStreamState::Scan {
reader,
partition_values,
next: None,
};
}
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
}
}
Err(e) => {
self.file_stream_metrics.file_open_errors.add(1);
match self.on_error {
OnError::Skip => {
self.file_stream_metrics.time_opening.stop();
self.state = FileStreamState::Idle
}
OnError::Fail => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
}
}
},
FileStreamState::Scan {
reader,
partition_values,
next,
} => {
// We need to poll the next `FileOpenFuture` here to drive it forward
if let Some((next_open_future, _)) = next {
if let NextOpen::Pending(f) = next_open_future {
if let Poll::Ready(reader) = f.as_mut().poll(cx) {
*next_open_future = NextOpen::Ready(reader);
}
}
}
match ready!(reader.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
let result = self
.pc_projector
.project(batch, partition_values)
.map_err(|e| ArrowError::ExternalError(e.into()))
.map(|batch| match &mut self.remain {
Some(remain) => {
if *remain > batch.num_rows() {
*remain -= batch.num_rows();
batch
} else {
let batch = batch.slice(0, *remain);
self.state = FileStreamState::Limit;
*remain = 0;
batch
}
}
None => batch,
});
if result.is_err() {
// If the partition value projection fails, this is not governed by
// the `OnError` behavior
self.state = FileStreamState::Error
}
self.file_stream_metrics.time_scanning_total.start();
return Poll::Ready(Some(result.map_err(Into::into)));
}
Some(Err(err)) => {
self.file_stream_metrics.file_scan_errors.add(1);
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
match self.on_error {
// If `OnError::Skip` we skip the file as soon as we hit the first error
OnError::Skip => match mem::take(next) {
Some((future, partition_values)) => {
self.file_stream_metrics.time_opening.start();
match future {
NextOpen::Pending(future) => {
self.state = FileStreamState::Open {
future,
partition_values,
}
}
NextOpen::Ready(reader) => {
self.state = FileStreamState::Open {
future: Box::pin(std::future::ready(
reader,
)),
partition_values,
}
}
}
}
None => return Poll::Ready(None),
},
OnError::Fail => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(err.into())));
}
}
}
None => {
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
match mem::take(next) {
Some((future, partition_values)) => {
self.file_stream_metrics.time_opening.start();
match future {
NextOpen::Pending(future) => {
self.state = FileStreamState::Open {
future,
partition_values,
}
}
NextOpen::Ready(reader) => {
self.state = FileStreamState::Open {
future: Box::pin(std::future::ready(
reader,
)),
partition_values,
}
}
}
}
None => return Poll::Ready(None),
}
}
}
}
FileStreamState::Error | FileStreamState::Limit => {
return Poll::Ready(None)
}
}
}
}