cicd-deployers/dataform_runner.py (155 lines of code) (raw):
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import argparse
import collections
import sys
import json
import re
from google.cloud import dataform_v1beta1
from google.cloud import asset_v1
df_client = dataform_v1beta1.DataformClient()
iam_client = asset_v1.AssetServiceClient()
def execute_workflow(repo_uri: str, compilation_result: str, tags: list):
"""Triggers a Dataform workflow execution based on a provided compilation result.
Args:
repo_uri (str): The URI of the Dataform repository.
compilation_result (str): The name of the compilation result to use.
Returns:
str: The name of the created workflow invocation.
"""
invocation_config = dataform_v1beta1.types.InvocationConfig(
included_tags=tags
)
request = dataform_v1beta1.CreateWorkflowInvocationRequest(
parent=repo_uri,
workflow_invocation=dataform_v1beta1.types.WorkflowInvocation(
compilation_result=compilation_result,
invocation_config=invocation_config
)
)
response = df_client.create_workflow_invocation(request=request)
name = response.name
logging.info(f'created workflow invocation {name}')
return name
def compile_workflow(repo_uri: str, branch: str):
"""Compiles a Dataform workflow using a specified Git branch.
Args:
repo_uri (str): The URI of the Dataform repository.
gcp_project (str): The GCP project ID.
tag (str): The dataform tag to compile.
branch (str): The Git branch to compile.
Returns:
str: The name of the created compilation result.
"""
request = dataform_v1beta1.CreateCompilationResultRequest(
parent=repo_uri,
compilation_result=dataform_v1beta1.types.CompilationResult(
git_commitish=branch
)
)
response = df_client.create_compilation_result(request=request)
name = response.name
logging.info(f'compiled workflow {name}')
return name
def get_workflow_status(workflow_invocation_name):
"""Monitors the status of a Dataform workflow invocation.
Args:
workflow_invocation_name (str): The ID of the workflow invocation.
df_client: The Dataform client object.
"""
while True:
request = dataform_v1beta1.GetWorkflowInvocationRequest(
name=workflow_invocation_name
)
response = df_client.get_workflow_invocation(request)
state = response.state.name
logging.info(f'workflow state: {state} for {workflow_invocation_name}')
if state == 'RUNNING':
time.sleep(4)
continue
if state in ('FAILED', 'CANCELING', 'CANCELLED'):
raise Exception(f'Error while running workflow {workflow_invocation_name}')
elif state == 'SUCCEEDED':
return
break
return
def run_workflow(gcp_project: str, project_num: str, location: str, repo_name: str, tags: list, execute: str,
branch: str):
"""Orchestrates the complete Dataform workflow process: compilation and execution.
Args:
gcp_project (str): The GCP project ID.
project_num (str): The GCP project Number.
location (str): The GCP region.
repo_name (str): The name of the Dataform repository.
tag (str): The target tags to compile and execute.
branch (str): The Git branch to use.
"""
repo_uri = f'projects/{gcp_project}/locations/{location}/repositories/{repo_name}'
compilation_result = compile_workflow(repo_uri, branch)
if execute:
workflow_invocation_name = execute_workflow(repo_uri, compilation_result, tags)
get_workflow_status(workflow_invocation_name)
print(json.dumps({}, indent=2))
def extract_config_name(file_path):
"""
Extracts the config name from a Dataform SQLX file.
Args:
file_path: Path to the SQLX file.
Returns:
The config name as a string, or None if not found.
"""
try:
with open(file_path, 'r') as f:
content = f.read()
match = re.search(r'config \{.*?name: "(.*?)"', content, re.DOTALL)
if match:
return match.group(1)
else:
logging.info(f"Config name not found in {file_path}.")
return None
except FileNotFoundError:
logging.info(f"File not found: {file_path}")
return None
def extract_iam_metadata(file_path):
"""
Extracts IAM metadata from a Dataform SQLX file.
Args:
file_path: Path to the SQLX file.
Returns:
A dictionary containing the IAM metadata, or None if not found.
"""
with open(file_path, 'r') as f:
content = f.read()
match = re.search(r'//iam_metadata: ({[\s\S]*?})', content)
if match:
json_str = match.group(1).replace("//", "")
try:
iam_metadata = json.loads(json_str)
return iam_metadata
except json.JSONDecodeError:
logging.info("Error decoding JSON metadata.")
return None
else:
logging.info("IAM metadata not found in the file.")
return None
def validate_service_account(project_id, service_account_email, required_role):
"""
Validates if a Google Cloud service account exists and has a specified role.
Args:
project_id: The ID of the Google Cloud project.
service_account_email: The email address of the service account.
required_role: The role the service account should have (e.g., "roles/storage.objectAdmin").
Returns:
True if the service account exists and has the role, False otherwise.
"""
# Construct the service account resource name
resource_name = f"//iam.googleapis.com/projects/{project_id}/serviceAccounts/{service_account_email}"
# Analyze IAM policy for the service account
response = iam_client.analyze_iam_policy(
request={
"analysis_query": {
"scope": f"projects/{project_id}",
"resource_selector": {"full_resource_name": resource_name},
"identity_selector": {"identity": f"serviceAccount:{service_account_email}"}
}
}
)
# Check if the required role is in the policy bindings
for binding in response.main_analysis.analysis_results[0].iam_binding.bindings:
if required_role in binding.role:
return True
return False
logging.info(f"Service account {service_account_email} does not have the role {required_role} in project {project_id}.")
return False
def main(args: collections.abc.Sequence[str]) -> int:
"""The main function parses command-line arguments and calls the run_workflow function to execute the complete Dataform workflow.
To run the script, provide the required command-line arguments:
python intro.py --project_id your_project_id --location your_location --repository your_repo_name --dataset your_bq_dataset --branch your_branch
"""
parser = argparse.ArgumentParser(description="Dataform Workflows runner")
parser.add_argument("--project_id",
type=str,
required=True,
help="The GCP project ID where the Dataform code will be deployed.")
parser.add_argument("--project_number",
type=str,
required=True,
help="The GCP project Number where the Dataform code will be deployed.")
parser.add_argument("--location",
type=str,
required=True,
help="The location of the Dataform repository.")
parser.add_argument("--repository",
type=str,
required=True,
help="The name of the Dataform repository to compile and run")
parser.add_argument("--tags",
nargs="*", # 0 or more values expected => creates a list
type=str,
required=True,
help="The target tags to compile and execute")
parser.add_argument("--execute",
type=str,
required=True,
help="Control if dataform repository will be executed or compiled only.")
parser.add_argument("--branch",
type=str,
required=True,
help="The branch of the Dataform repository to use.")
params = parser.parse_args(args)
project_id = str(params.project_id)
project_number = str(params.project_number)
location = str(params.location)
repository = str(params.repository)
execute = str(params.execute)
tags = list(params.tags)
branch = str(params.branch)
run_workflow(gcp_project=project_id,
project_num=project_number,
location=location,
repo_name=repository,
tags=tags,
execute=execute,
branch=branch)
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))