in functions/orchestration-helpers/pipeline-executor/main.py [0:0]
def call_workflows(workflows_name, start_date, end_date,
validation_date_pattern, workflow_properties,
same_day_execution):
"""
calls a cloud workflows pipeline passed by parameter
Args:
workflows_name: name of te cloud workflows to execute
start_date: start date passed by parameter to the workflows pipeline ( normally a data pipeline )
end_date: end date passed by parameter to the workflows pipeline ( normally a data pipeline )
validation_date_pattern: python data pattern format to apply to start and end dates
workflow_properties: custom properties passed to the cloud workflows.
same_day_execution: can be YESTERDAY, TODAY or YESTERDAY_TODAY indicating dates that should be passed in
start and end dates, if not received by parameter.
Returns:
execution_id: cloud workflows unique execution identifier
"""
print("Launching Custom Workflow.....")
if start_date is None: # it means is not done manually
start_date, end_date = process_dates(validation_date_pattern, same_day_execution)
if end_date is None:
end_date = start_date
if isinstance(workflow_properties, str):
workflow_properties = json.loads(workflow_properties)
arguments = {
"workflow_name": workflows_name,
"query_variables": {
"start_date": start_date,
"end_date": end_date,
},
"workflow_properties": workflow_properties
}
print('Cloud Workflows input params: %s ', arguments)
execution = Execution(argument=json.dumps(arguments))
# Construct the fully qualified location path.
parent = workflows_client.workflow_path(WORKFLOW_CONTROL_PROJECT_ID, WORKFLOWS_LOCATION, workflows_name)
# Execute the workflow.
response = execution_client.create_execution(parent=parent, execution=execution)
execution_id = response.name.split("/")[-1]
print(f"Created execution: {execution_id}")
return execution_id