functions/orchestration-helpers/scheduling/main.py (120 lines of code) (raw):

# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import json import logging import google.cloud.logging import functions_framework import google.oauth2.id_token from google.cloud import error_reporting from cloudevents.http import CloudEvent from google.cloud import firestore from google.events.cloud import firestore as firestoredata from google.cloud import scheduler_v1 # Access environment variables WORKFLOW_SCHEDULING_PROJECT_ID = os.environ.get('WORKFLOW_SCHEDULING_PROJECT_ID') WORKFLOW_SCHEDULING_PROJECT_NUMBER = os.environ.get('WORKFLOW_SCHEDULING_PROJECT_NUMBER') WORKFLOW_SCHEDULING_PROJECT_REGION = os.environ.get('WORKFLOW_SCHEDULING_PROJECT_REGION') WORKFLOW_SCHEDULING_FIRESTORE_COLLECTION = os.environ.get('WORKFLOW_SCHEDULING_FIRESTORE_COLLECTION') PIPELINE_EXECUTION_FUNCTION_NAME = os.environ.get('PIPELINE_EXECUTION_FUNCTION_NAME') # define clients error_client = error_reporting.Client() client = google.cloud.logging.Client() client.setup_logging() logger = logging.getLogger() logger.setLevel(logging.DEBUG) firestore_client = firestore.Client() scheduler_client = scheduler_v1.CloudSchedulerClient() @functions_framework.cloud_event def main(cloud_event: CloudEvent) -> None: """ Main function, likely triggered by an eventarc event coming from firestore. Acts as a lif cycle manager for cloud scheduling rules that triggers cloud workflows pipelines through pipeline executor function, creating, updating and deleting them when necessary. Args: request: The incoming eventarc request object. """ print(f"EVENT::: path: {cloud_event}") firestore_payload = firestoredata.DocumentEventData() firestore_payload._pb.ParseFromString(cloud_event.data) document_value = None if firestore_payload.value: document_value = firestore_payload.value elif firestore_payload.old_value: document_value = firestore_payload.old_value path_parts = document_value.name.split("/") separator_idx = path_parts.index("documents") collection_path = path_parts[separator_idx + 1] document_path = "/".join(path_parts[(separator_idx + 2) :]) job_name = document_path print(f"Collection path: {collection_path}") print(f"Document path: {document_path}") if determine_job_type(firestore_payload.old_value, firestore_payload.value) in ('CREATE','UPDATE'): crond_expression = firestore_payload.value.fields["crond_expression"].string_value validation_date_pattern = firestore_payload.value.fields["date_format"].string_value time_zone = firestore_payload.value.fields["time_zone"].string_value workflow_status = firestore_payload.value.fields["workflow_status"].string_value workflow_properties = firestore_payload.value.fields["workflow_properties"].string_value workflow_parameters = { "workflows_name" : job_name, "validation_date_pattern" : validation_date_pattern, "same_day_execution" : "YESTERDAY", "workflow_status" : workflow_status, "workflow_properties" : workflow_properties } if determine_job_type(firestore_payload.old_value, firestore_payload.value) == 'CREATE': create_job(job_name, crond_expression, time_zone, workflow_parameters) if determine_job_type(firestore_payload.old_value, firestore_payload.value) == 'UPDATE': update_job(job_name, crond_expression, time_zone, workflow_parameters) if determine_job_type(firestore_payload.old_value, firestore_payload.value) in ('CREATE','UPDATE'): change_status(job_name, firestore_payload.value) if determine_job_type(firestore_payload.old_value, firestore_payload.value) == 'DELETE': delete_job(job_name) def determine_job_type(old_value ,new_value): """ Evaluates what type of event was triggered in firestore: CREATE, UPDATE or DELETE Args: old value: old firestore value coming in trigger info new value: new firestore value coming in trigger info Returns: type of event """ if new_value and not old_value: return 'CREATE' elif old_value and not new_value: return 'DELETE' elif old_value and new_value: return 'UPDATE' def create_job(job_name, crond_expression, time_zone, workflow_parameters): """ creates a scheduler job , using given parameters. Args: job_name: name for the scheduler job, should be the same as cloud workflows name crond_expression: crond linux expression used to trigger the cloud scheduler rule time_zone: timezone associated with scheduler execution. workflow_parameters: parameters sent to the cloud workflows invocation """ parent= scheduler_client.common_location_path(WORKFLOW_SCHEDULING_PROJECT_ID,WORKFLOW_SCHEDULING_PROJECT_REGION) job={ "name":"projects/"+ WORKFLOW_SCHEDULING_PROJECT_ID+ "/locations/"+WORKFLOW_SCHEDULING_PROJECT_REGION+"/jobs/" + job_name, "description":"workflows scheduler job create", "http_target": { "http_method": "POST", "uri": f"https://{WORKFLOW_SCHEDULING_PROJECT_REGION}-{WORKFLOW_SCHEDULING_PROJECT_ID}.cloudfunctions.net/{PIPELINE_EXECUTION_FUNCTION_NAME}" , "headers": {"Content-Type": "application/json"}, "oidc_token": {"service_account_email": WORKFLOW_SCHEDULING_PROJECT_NUMBER + "-compute@developer.gserviceaccount.com"}, "body": json.dumps(workflow_parameters).encode("utf-8"), }, "schedule":crond_expression, "time_zone":time_zone, } scheduler_client.create_job(parent=parent,job=job) print("JOB CREATED...........") def update_job(job_name, crond_expression, time_zone, workflow_parameters): """ updates a scheduler job , using given parameters. Args: job_name: name for the scheduler job, should be the same as cloud workflows name crond_expression: crond linux expression used to trigger the cloud scheduler rule time_zone: timezone associated with scheduler execution. workflow_parameters: parameters sent to the cloud workflows invocation """ job={ "name":"projects/"+ WORKFLOW_SCHEDULING_PROJECT_ID+ "/locations/"+WORKFLOW_SCHEDULING_PROJECT_REGION+"/jobs/" + job_name, "description":"workflows scheduler job update", "http_target": { "http_method": "POST", "uri": f"https://{WORKFLOW_SCHEDULING_PROJECT_REGION}-{WORKFLOW_SCHEDULING_PROJECT_ID}.cloudfunctions.net/{PIPELINE_EXECUTION_FUNCTION_NAME}" , "headers": {"Content-Type": "application/json"}, "oidc_token": {"service_account_email": WORKFLOW_SCHEDULING_PROJECT_NUMBER + "-compute@developer.gserviceaccount.com"}, "body": json.dumps(workflow_parameters).encode("utf-8"), }, "schedule":crond_expression, "time_zone":time_zone, } scheduler_client.update_job(job=job) print("JOB UPDATED...........") def delete_job(job_name): """ deletes a scheduler job , using given parameters. Args: job_name: name for the scheduler job, should be the same as cloud workflows name """ final_job_name = "projects/"+ WORKFLOW_SCHEDULING_PROJECT_ID+ "/locations/"+WORKFLOW_SCHEDULING_PROJECT_REGION+"/jobs/" + job_name scheduler_client.delete_job(name=final_job_name) print("JOB DELETED...........") def change_status(job_name, new_value): """ evaluates if a cloud scheduler rule must be paused or resumed, depending on firestore trigger Args: job_name: name for the scheduler job, should be the same as cloud workflows name new value: new firestore value coming in trigger info """ workflow_status = new_value.fields["workflow_status"].string_value print(f"workflow_status: {workflow_status} ") if workflow_status == 'DISABLED': pause_job(job_name) if workflow_status == 'ENABLED': resume_job(job_name) def pause_job(job_name): """ pauses a scheduler job , using given parameters. Args: job_name: name for the scheduler job, should be the same as cloud workflows name """ final_job_name = "projects/"+ WORKFLOW_SCHEDULING_PROJECT_ID+ "/locations/"+WORKFLOW_SCHEDULING_PROJECT_REGION+"/jobs/" + job_name scheduler_client.pause_job(name=final_job_name) print("JOB PAUSED...........") def resume_job(job_name): """ resumes a scheduler job , using given parameters. Args: job_name: name for the scheduler job, should be the same as cloud workflows name """ final_job_name = "projects/"+ WORKFLOW_SCHEDULING_PROJECT_ID+ "/locations/"+WORKFLOW_SCHEDULING_PROJECT_REGION+"/jobs/" + job_name scheduler_client.resume_job(name=final_job_name) print("JOB RESUMED...........")