in src/dataframe.rs [702:732]
fn __arrow_c_stream__<'py>(
&'py mut self,
py: Python<'py>,
requested_schema: Option<Bound<'py, PyCapsule>>,
) -> PyDataFusionResult<Bound<'py, PyCapsule>> {
let mut batches = wait_for_future(py, self.df.as_ref().clone().collect())?;
let mut schema: Schema = self.df.schema().to_owned().into();
if let Some(schema_capsule) = requested_schema {
validate_pycapsule(&schema_capsule, "arrow_schema")?;
let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
let desired_schema = Schema::try_from(schema_ptr)?;
schema = project_schema(schema, desired_schema)?;
batches = batches
.into_iter()
.map(|record_batch| record_batch_into_schema(record_batch, &schema))
.collect::<Result<Vec<RecordBatch>, ArrowError>>()?;
}
let batches_wrapped = batches.into_iter().map(Ok);
let reader = RecordBatchIterator::new(batches_wrapped, Arc::new(schema));
let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
let ffi_stream = FFI_ArrowArrayStream::new(reader);
let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
PyCapsule::new(py, ffi_stream, Some(stream_capsule_name)).map_err(PyDataFusionError::from)
}