in datafusion_ray/core.py [0:0]
def create_ray_stages(self):
stage_datas = []
# note, whereas the PyDataFrameStage object contained in self.stages()
# holds information for a numbered stage,
# when we tell the supervisor about our query, it wants a StageData
# object per actor that will be created. Hence the loop over partition_groups
for stage in self.stages():
for partition_group in stage.partition_groups:
stage_datas.append(
StageData(
stage.stage_id,
stage.plan_bytes(),
partition_group,
stage.child_stage_ids,
stage.num_output_partitions,
stage.full_partitions,
)
)
ref = self.supervisor.new_query.remote(stage_datas)
call_sync(wait_for([ref], "creating ray stages"))