in code/workflow/implementations/autopilot/bp_init_stage.py [0:0]
def create_processing_inputs(self, processing_dir, flow, flow_uri):
processing_inputs = []
flow_processing_input = self.create_flow_notebook_processing_input(processing_dir, flow_uri)
processing_inputs.append(flow_processing_input)
for node in flow["nodes"]:
if "dataset_definition" in node["parameters"]:
data_def = node["parameters"]["dataset_definition"]
name = data_def["name"]
source_type = data_def["datasetSourceType"]
if source_type == "S3":
processing_inputs.append(self.create_s3_processing_input(data_def, name, processing_dir))
elif source_type == "Athena":
processing_inputs.append(self.create_athena_processing_input(data_def, name, processing_dir))
elif source_type == "Redshift":
processing_inputs.append(self.create_redshift_processing_input(data_def, name, processing_dir))
else:
raise ValueError(f"{source_type} is not supported for Data Wrangler Processing.")
return processing_inputs