functions/orchestration-helpers/intermediate/main.py (162 lines of code) (raw):

# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import re import google.auth import urllib import urllib.error import urllib.request import json import logging import google.auth.transport.requests import functions_framework import google.oauth2.id_token from google.cloud import bigquery from datetime import datetime, timedelta from google.cloud import error_reporting from enum import Enum from urllib import parse # Access environment variables WORKFLOW_CONTROL_PROJECT_ID = os.environ.get('WORKFLOW_CONTROL_PROJECT_ID') WORKFLOW_CONTROL_DATASET_ID = os.environ.get('WORKFLOW_CONTROL_DATASET_ID') WORKFLOW_CONTROL_TABLE_ID = os.environ.get('WORKFLOW_CONTROL_TABLE_ID') # define clients bq_client = bigquery.Client(project=WORKFLOW_CONTROL_PROJECT_ID) error_client = error_reporting.Client() logger = logging.getLogger() logger.setLevel(logging.DEBUG) class JobStatus(Enum): SUCCESS = ("DONE", "SUCCESS", "SUCCEEDED", "JOB_STATE_DONE") RUNNING = ("PENDING", "RUNNING", "JOB_STATE_QUEUED", "JOB_STATE_RUNNING", "JOB_STATE_PENDING") @functions_framework.http def main(request): """ Main function, likely triggered by an HTTP request from cloud workflows. Acts as an intermediary between workflows and executor functions, taking care of the non-functional requirements as process metadata creation , error handling, notifications management, checkpoint management, etc. Args: request: The incoming HTTP request object. Returns: str: The status of the query execution or the job ID (if asynchronous). """ request_json = request.get_json() print("event: " + str(request_json)) try: if request_json and 'call_type' in request_json: call_type = request_json['call_type'] else: Exception("No call type!") if call_type == "get_id": get_id_result = evaluate_error(call_custom_function(request_json, None)) status = 'started' if is_valid_step_id(get_id_result) else 'failed_start' log_step_bigquery(request_json, status) return get_id_result elif call_type == "get_status": if request_json and 'async_job_id' in request_json: status = evaluate_error(call_custom_function(request_json, request_json['async_job_id'])) else: Exception("Job Id not received!") return status else: raise Exception("Invalid call type!") except Exception as ex: exception_message = "Exception : " + repr(ex) # TODO register error in checkpoint table error_client.report_exception() logger.error(exception_message) print(RuntimeError(repr(ex))) return exception_message, 500 def is_valid_step_id(step_id): """Checks if a step ID starts with "aef_" or "aef-". Args: step_id: The step ID string to check. Returns: True if the step ID is valid, False otherwise. """ pattern = r"^aef[_-]" # Use a regular expression for more flexibility return bool(re.match(pattern, step_id)) def evaluate_error(message): """ Evaluates if a message has an error Args: str: message to evaluate Returns: raise Exception if the word "exception" found in message str: original message coming from executor functions :param message: """ if 'error' in message.lower() or 'exception' in message.lower(): raise Exception(message) return message def log_step_bigquery(request_json, status): """ Logs a new entry in workflows bigquery table on finished or started step, ether it failed of succeed Args: status: status of the execution request_json: event object containing info to log """ target_function_url = request_json['function_url_to_call'] current_datetime = datetime.now().isoformat() status_to_error_code = { 'success': '0', 'started': '0', 'failed_start': '1', 'failed': '2' } data = { 'workflow_execution_id': request_json['execution_id'], 'workflow_name': request_json['workflow_name'], 'job_name': request_json['job_name'], 'job_status': status, 'timestamp': current_datetime, 'error_code': status_to_error_code.get(status, '2'), 'job_params': str(request_json), 'log_path': get_cloud_logging_url(target_function_url), 'retry_count': 0 # TODO } workflows_control_table = bq_client.dataset(WORKFLOW_CONTROL_DATASET_ID).table(WORKFLOW_CONTROL_TABLE_ID) errors = bq_client.insert_rows_json(workflows_control_table, [data]) if not errors: print("New row has been added.") else: raise Exception("Encountered errors while inserting row: {}".format(errors)) def get_cloud_logging_url(target_function_url): """ Retrieves the Cloud Logging URL for the most recent execution of a specified Google Cloud Function. Args: target_function_url (str): The URL of the target Google Cloud Function. Returns: str: The Cloud Logging URL for the most recent execution of the function, or None if no matching log entries are found. """ date = datetime.utcnow() - timedelta(minutes=59) function_name = target_function_url.split('/')[-1] # Remove newline characters and extra whitespace from the filter string filter_str = f""" (resource.type="cloud_function" AND resource.labels.function_name="{function_name}") OR (resource.type="cloud_run_revision" AND resource.labels.function_name="{function_name}") AND timestamp>="{date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")}" """ filter_str = ' '.join(filter_str.split()) # Remove extra whitespace # Then apply the double URL encoding (TODO enhance encoding to get URL link) encoded_filter = parse.quote(parse.quote(filter_str, safe=''), safe='') encoded_filter = ( encoded_filter .replace('%253D%2522', '%20%3D%20%22') .replace("%2522%2520", "%22%0A%20") .replace("%2520", "%20") .replace("%2522%2529%20", "%22%2529%0A%20") .replace("%253E%20%3D%20%", "%3E%3D%") .replace("%253A", ":") .replace("Z%2522", "Z%22") ) base_url = "https://console.cloud.google.com/logs/query" query_params = f";query={encoded_filter}" log_url = f"{base_url}{query_params}" print("Cloud Logging URL query:", log_url) return log_url def call_custom_function(request_json, async_job_id): """ calls an executor function passed by parameter Args: request_json: json input object with parameters async_job_id: if filled, function ask by the execution status. if not, launches the execution for the first time Returns: raise Exception if the word "exception" is found in message str: original message coming from executor functions """ workflow_name = request_json['workflow_name'] job_name = request_json['job_name'] workflow_properties = request_json.get('workflow_properties') step_properties = request_json.get('step_properties') workflow_properties = join_properties(workflow_properties, step_properties) params = { "workflow_properties": workflow_properties, "workflow_name": workflow_name, "job_name": job_name, "query_variables": { "start_date": "'" + request_json['query_variables']['start_date'] + "'", "end_date": "'" + request_json['query_variables']['end_date'] + "'" } } if async_job_id: params['job_id'] = async_job_id target_function_url = request_json['function_url_to_call'] try: req = urllib.request.Request(target_function_url, data=json.dumps(params).encode("utf-8")) auth_req = google.auth.transport.requests.Request() id_token = google.oauth2.id_token.fetch_id_token(auth_req, target_function_url) req.add_header("Authorization", f"Bearer {id_token}") req.add_header("Content-Type", "application/json") response = urllib.request.urlopen(req) response = response.read() print('response: ' + str(response)) final_response = '' # Handle the response decoded_response = response.decode("utf-8") if async_job_id is None and is_valid_step_id(decoded_response): final_response = decoded_response elif decoded_response in JobStatus.SUCCESS.value: final_response = "success" log_step_bigquery(request_json, final_response) elif decoded_response in JobStatus.RUNNING.value: final_response = "running" else: # FAILURE final_response = f"Exception calling target function {target_function_url.split('/')[-1]}:{decoded_response}" log_step_bigquery(request_json, "failed") print("final response: " + final_response) return final_response except (urllib.error.HTTPError) as e: print('Exception: ' + repr(e)) raise Exception( "Unexpected error in custom function: " + target_function_url.split('/')[-1] + ":" + repr(e)) def join_properties(workflow_properties, step_properties): """ receives 2 dictionaries if exists, and join step properties into workflow properties, overriding props if necessary. Args: workflow_properties: properties passed in firestore to the workflow step_properties: properties configured in each step in functional json workflow definition. can overwrite workflow properties Returns: final properties dictionary """ # Handle None or empty inputs workflow_props = {} if workflow_properties: workflow_props = json.loads(workflow_properties) if isinstance(workflow_properties, str) else workflow_properties step_props = {} if step_properties: step_props = json.loads(step_properties) if isinstance(step_properties, str) else step_properties return {**workflow_props, **step_props}