in scripts/scaffold.py [0:0]
def create_pipeline_yaml(dir: str):
pipeline_yaml = {}
resources = []
sample_yaml = yaml.load((PROJECT_ROOT / "samples" / "pipeline.yaml").read_text())
tables = click.prompt(
"Input your BigQuery Table name(s) required for your pipeline\n"
"If you have multiple tables, please use a comma-seperated list. (eg. table1, table2, table3)"
)
for table_name in tables.split(","):
sample_yaml["resources"][0]["table_id"] = table_name.strip()
bq_resource = sample_yaml["resources"][0]
resources.append(bq_resource.copy())
pipeline_yaml["resources"] = resources
tasks = []
airflow_operators = json.loads(
(PROJECT_ROOT / "scripts" / "dag_imports.json").read_text()
)
operators = airflow_operators["2"]
while True:
operator = click.prompt(
"\nWhich operator would you like to add?",
type=click.Choice(list(operators), case_sensitive=False),
)
t = [task["operator"] for task in sample_yaml["dag"]["tasks"]]
operator_idx = t.index(operator)
tasks.append(sample_yaml["dag"]["tasks"][operator_idx])
if not click.confirm("\nWould you like to add another operator?"):
sample_yaml["dag"]["tasks"] = tasks
pipeline_yaml["dag"] = sample_yaml["dag"]
with open(f"{dir}/pipeline.yaml", "w") as pipeline_out:
pipeline_out.write(license_header)
yaml.dump(CommentedMap(pipeline_yaml), pipeline_out)
click.echo(f"\n{dir}/pipeline.yaml has been created\n")
break