def create_ray_stages()

in datafusion_ray/core.py [0:0]


    def create_ray_stages(self):
        stage_datas = []

        # note, whereas the PyDataFrameStage object contained in self.stages()
        # holds information for a numbered stage,
        # when we tell the supervisor about our query, it wants a StageData
        # object per actor that will be created.  Hence the loop over partition_groups
        for stage in self.stages():
            for partition_group in stage.partition_groups:
                stage_datas.append(
                    StageData(
                        stage.stage_id,
                        stage.plan_bytes(),
                        partition_group,
                        stage.child_stage_ids,
                        stage.num_output_partitions,
                        stage.full_partitions,
                    )
                )

        ref = self.supervisor.new_query.remote(stage_datas)
        call_sync(wait_for([ref], "creating ray stages"))