in functions/orchestration-helpers/scheduling/main.py [0:0]
def main(cloud_event: CloudEvent) -> None:
"""
Main function, likely triggered by an eventarc event coming from firestore.
Acts as a lif cycle manager for cloud scheduling rules that triggers cloud workflows pipelines through
pipeline executor function, creating, updating and deleting them when necessary.
Args:
request: The incoming eventarc request object.
"""
print(f"EVENT::: path: {cloud_event}")
firestore_payload = firestoredata.DocumentEventData()
firestore_payload._pb.ParseFromString(cloud_event.data)
document_value = None
if firestore_payload.value:
document_value = firestore_payload.value
elif firestore_payload.old_value:
document_value = firestore_payload.old_value
path_parts = document_value.name.split("/")
separator_idx = path_parts.index("documents")
collection_path = path_parts[separator_idx + 1]
document_path = "/".join(path_parts[(separator_idx + 2) :])
job_name = document_path
print(f"Collection path: {collection_path}")
print(f"Document path: {document_path}")
if determine_job_type(firestore_payload.old_value, firestore_payload.value) in ('CREATE','UPDATE'):
crond_expression = firestore_payload.value.fields["crond_expression"].string_value
validation_date_pattern = firestore_payload.value.fields["date_format"].string_value
time_zone = firestore_payload.value.fields["time_zone"].string_value
workflow_status = firestore_payload.value.fields["workflow_status"].string_value
workflow_properties = firestore_payload.value.fields["workflow_properties"].string_value
workflow_parameters = {
"workflows_name" : job_name,
"validation_date_pattern" : validation_date_pattern,
"same_day_execution" : "YESTERDAY",
"workflow_status" : workflow_status,
"workflow_properties" : workflow_properties
}
if determine_job_type(firestore_payload.old_value, firestore_payload.value) == 'CREATE':
create_job(job_name, crond_expression, time_zone, workflow_parameters)
if determine_job_type(firestore_payload.old_value, firestore_payload.value) == 'UPDATE':
update_job(job_name, crond_expression, time_zone, workflow_parameters)
if determine_job_type(firestore_payload.old_value, firestore_payload.value) in ('CREATE','UPDATE'):
change_status(job_name, firestore_payload.value)
if determine_job_type(firestore_payload.old_value, firestore_payload.value) == 'DELETE':
delete_job(job_name)