in src/dataset_exec.rs [182:237]
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let batch_size = context.session_config().batch_size();
Python::with_gil(|py| {
let dataset = self.dataset.as_ref(py);
let fragments = self.fragments.as_ref(py);
let fragment = fragments
.get_item(partition)
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
// We need to pass the dataset schema to unify the fragment and dataset schema per PyArrow docs
let dataset_schema = dataset
.getattr("schema")
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
let kwargs = PyDict::new(py);
kwargs
.set_item("columns", self.columns.clone())
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
kwargs
.set_item(
"filter",
self.filter_expr.as_ref().map(|expr| expr.clone_ref(py)),
)
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
kwargs
.set_item("batch_size", batch_size)
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
let scanner = fragment
.call_method("scanner", (dataset_schema,), Some(kwargs))
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
let schema: SchemaRef = Arc::new(
scanner
.getattr("projected_schema")
.and_then(|schema| Ok(schema.extract::<PyArrowType<_>>()?.0))
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?,
);
let record_batches: &PyIterator = scanner
.call_method0("to_batches")
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?
.iter()
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
let record_batches = PyArrowBatchesAdapter {
batches: record_batches.into(),
};
let record_batch_stream = stream::iter(record_batches);
let record_batch_stream: SendableRecordBatchStream = Box::pin(
RecordBatchStreamAdapter::new(schema, record_batch_stream.map_err(|e| e.into())),
);
Ok(record_batch_stream)
})
}