in datafusion_ray/core.py [0:0]
def collect(self) -> list[pa.RecordBatch]:
if not self._batches:
t1 = time.time()
self.stages()
t2 = time.time()
log.debug(f"creating stages took {t2 - t1}s")
last_stage_id = max([stage.stage_id for stage in self._stages])
log.debug(f"last stage is {last_stage_id}")
self.create_ray_stages()
last_stage_addrs = ray.get(
self.supervisor.get_stage_addrs.remote(last_stage_id)
)
log.debug(f"last stage addrs {last_stage_addrs}")
reader = self.df.read_final_stage(
last_stage_id, last_stage_addrs[0]
)
log.debug("got reader")
self._batches = list(reader)
return self._batches