in cicd-deployers/dataform_runner.py [0:0]
def main(args: collections.abc.Sequence[str]) -> int:
"""The main function parses command-line arguments and calls the run_workflow function to execute the complete Dataform workflow.
To run the script, provide the required command-line arguments:
python intro.py --project_id your_project_id --location your_location --repository your_repo_name --dataset your_bq_dataset --branch your_branch
"""
parser = argparse.ArgumentParser(description="Dataform Workflows runner")
parser.add_argument("--project_id",
type=str,
required=True,
help="The GCP project ID where the Dataform code will be deployed.")
parser.add_argument("--project_number",
type=str,
required=True,
help="The GCP project Number where the Dataform code will be deployed.")
parser.add_argument("--location",
type=str,
required=True,
help="The location of the Dataform repository.")
parser.add_argument("--repository",
type=str,
required=True,
help="The name of the Dataform repository to compile and run")
parser.add_argument("--tags",
nargs="*", # 0 or more values expected => creates a list
type=str,
required=True,
help="The target tags to compile and execute")
parser.add_argument("--execute",
type=str,
required=True,
help="Control if dataform repository will be executed or compiled only.")
parser.add_argument("--branch",
type=str,
required=True,
help="The branch of the Dataform repository to use.")
params = parser.parse_args(args)
project_id = str(params.project_id)
project_number = str(params.project_number)
location = str(params.location)
repository = str(params.repository)
execute = str(params.execute)
tags = list(params.tags)
branch = str(params.branch)
run_workflow(gcp_project=project_id,
project_num=project_number,
location=location,
repo_name=repository,
tags=tags,
execute=execute,
branch=branch)