in src/fmeval/transforms/transform_pipeline.py [0:0]
def execute(self, dataset: ray.data.Dataset) -> ray.data.Dataset:
"""Apply the Transforms in self.transforms to the input dataset.
:param dataset: A Ray Dataset.
:returns: The resulting Ray Dataset after all Transforms have been applied.
"""
for transform in self.transforms:
if isinstance(transform, BatchedTransform):
dataset = dataset.map_batches(
transform.__class__,
batch_size=transform.batch_size if transform.batch_size != -1 else "default",
fn_constructor_args=transform.args,
fn_constructor_kwargs=transform.kwargs,
concurrency=(1, get_num_actors()),
).materialize()
else:
dataset = dataset.map(
transform.__class__,
fn_constructor_args=transform.args,
fn_constructor_kwargs=transform.kwargs,
concurrency=(1, get_num_actors()),
).materialize()
return dataset