in functions/data-processing-engines/dataform-tag-executor/main.py [0:0]
def execute_workflow(repo_uri: str, compilation_result: str, tags: list):
"""Triggers a Dataform workflow execution based on a provided compilation result.
Args:
repo_uri (str): The URI of the Dataform repository.
compilation_result (str): The name of the compilation result to use.
Returns:
str: The name of the created workflow invocation.
"""
invocation_config = dataform_v1beta1.types.InvocationConfig(
included_tags=tags
)
request = dataform_v1beta1.CreateWorkflowInvocationRequest(
parent=repo_uri,
workflow_invocation=dataform_v1beta1.types.WorkflowInvocation(
compilation_result=compilation_result,
invocation_config=invocation_config
)
)
response = df_client.create_workflow_invocation(request=request)
name = response.name
logging.info(f'created workflow invocation {name}')
return name