fn execute()

in src/stage_reader.rs [95:177]


    fn execute(
        &self,
        partition: usize,
        context: std::sync::Arc<datafusion::execution::TaskContext>,
    ) -> Result<SendableRecordBatchStream> {
        let name = format!("RayStageReaderExec[{}-{}]:", self.stage_id, partition);
        trace!("{name} execute");
        let client_map = &context
            .session_config()
            .get_extension::<ServiceClients>()
            .ok_or(internal_datafusion_err!(
                "{name} Flight Client not in context"
            ))?
            .clone()
            .0;

        trace!("{name} client_map keys {:?}", client_map.keys());

        let clients = client_map
            .get(&(self.stage_id, partition))
            .ok_or(internal_datafusion_err!(
                "{} No flight clients found for {}:{}, have {:?}",
                name,
                self.stage_id,
                partition,
                client_map.keys()
            ))?
            .lock()
            .iter()
            .map(|c| {
                let inner_clone = c.inner().clone();
                FlightClient::new_from_inner(inner_clone)
            })
            .collect::<Vec<_>>();

        let ftd = FlightTicketData {
            dummy: false,
            partition: partition as u64,
        };

        let ticket = Ticket {
            ticket: ftd.encode_to_vec().into(),
        };

        let schema = self.schema.clone();

        let stream = async_stream::stream! {
            let mut error = false;

            let mut streams = vec![];
            for mut client in clients {
                let name = name.clone();
                trace!("{name} Getting flight stream" );
                match client.do_get(ticket.clone()).await {
                    Ok(flight_stream) => {
                        trace!("{name} Got flight stream. headers:{:?}", flight_stream.headers());
                        let rbr_stream = RecordBatchStreamAdapter::new(schema.clone(),
                            flight_stream
                                .map_err(move |e| internal_datafusion_err!("{} Error consuming flight stream: {}", name, e)));

                        streams.push(Box::pin(rbr_stream) as SendableRecordBatchStream);
                    },
                    Err(e) => {
                        error = true;
                        yield internal_err!("{} Error getting flight stream: {}", name, e);
                    }
                }
            }
            if !error {
                let mut combined = CombinedRecordBatchStream::new(schema.clone(),streams);

                while let Some(maybe_batch) = combined.next().await {
                    yield maybe_batch;
                }
            }

        };

        Ok(Box::pin(RecordBatchStreamAdapter::new(
            self.schema.clone(),
            stream,
        )))
    }