in src/stage_reader.rs [95:177]
fn execute(
&self,
partition: usize,
context: std::sync::Arc<datafusion::execution::TaskContext>,
) -> Result<SendableRecordBatchStream> {
let name = format!("RayStageReaderExec[{}-{}]:", self.stage_id, partition);
trace!("{name} execute");
let client_map = &context
.session_config()
.get_extension::<ServiceClients>()
.ok_or(internal_datafusion_err!(
"{name} Flight Client not in context"
))?
.clone()
.0;
trace!("{name} client_map keys {:?}", client_map.keys());
let clients = client_map
.get(&(self.stage_id, partition))
.ok_or(internal_datafusion_err!(
"{} No flight clients found for {}:{}, have {:?}",
name,
self.stage_id,
partition,
client_map.keys()
))?
.lock()
.iter()
.map(|c| {
let inner_clone = c.inner().clone();
FlightClient::new_from_inner(inner_clone)
})
.collect::<Vec<_>>();
let ftd = FlightTicketData {
dummy: false,
partition: partition as u64,
};
let ticket = Ticket {
ticket: ftd.encode_to_vec().into(),
};
let schema = self.schema.clone();
let stream = async_stream::stream! {
let mut error = false;
let mut streams = vec![];
for mut client in clients {
let name = name.clone();
trace!("{name} Getting flight stream" );
match client.do_get(ticket.clone()).await {
Ok(flight_stream) => {
trace!("{name} Got flight stream. headers:{:?}", flight_stream.headers());
let rbr_stream = RecordBatchStreamAdapter::new(schema.clone(),
flight_stream
.map_err(move |e| internal_datafusion_err!("{} Error consuming flight stream: {}", name, e)));
streams.push(Box::pin(rbr_stream) as SendableRecordBatchStream);
},
Err(e) => {
error = true;
yield internal_err!("{} Error getting flight stream: {}", name, e);
}
}
}
if !error {
let mut combined = CombinedRecordBatchStream::new(schema.clone(),streams);
while let Some(maybe_batch) = combined.next().await {
yield maybe_batch;
}
}
};
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
stream,
)))
}