fn execute()

in src/dataset_exec.rs [182:237]


    fn execute(
        &self,
        partition: usize,
        context: Arc<TaskContext>,
    ) -> DFResult<SendableRecordBatchStream> {
        let batch_size = context.session_config().batch_size();
        Python::with_gil(|py| {
            let dataset = self.dataset.as_ref(py);
            let fragments = self.fragments.as_ref(py);
            let fragment = fragments
                .get_item(partition)
                .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;

            // We need to pass the dataset schema to unify the fragment and dataset schema per PyArrow docs
            let dataset_schema = dataset
                .getattr("schema")
                .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
            let kwargs = PyDict::new(py);
            kwargs
                .set_item("columns", self.columns.clone())
                .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
            kwargs
                .set_item(
                    "filter",
                    self.filter_expr.as_ref().map(|expr| expr.clone_ref(py)),
                )
                .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
            kwargs
                .set_item("batch_size", batch_size)
                .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
            let scanner = fragment
                .call_method("scanner", (dataset_schema,), Some(kwargs))
                .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
            let schema: SchemaRef = Arc::new(
                scanner
                    .getattr("projected_schema")
                    .and_then(|schema| Ok(schema.extract::<PyArrowType<_>>()?.0))
                    .map_err(|err| InnerDataFusionError::External(Box::new(err)))?,
            );
            let record_batches: &PyIterator = scanner
                .call_method0("to_batches")
                .map_err(|err| InnerDataFusionError::External(Box::new(err)))?
                .iter()
                .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;

            let record_batches = PyArrowBatchesAdapter {
                batches: record_batches.into(),
            };

            let record_batch_stream = stream::iter(record_batches);
            let record_batch_stream: SendableRecordBatchStream = Box::pin(
                RecordBatchStreamAdapter::new(schema, record_batch_stream.map_err(|e| e.into())),
            );
            Ok(record_batch_stream)
        })
    }