def main()

in functions/orchestration-helpers/scheduling/main.py [0:0]


def main(cloud_event: CloudEvent) -> None:
    """
    Main function, likely triggered by an eventarc event coming from firestore.
    Acts as a lif cycle manager for cloud scheduling rules that triggers cloud workflows pipelines through
    pipeline executor function, creating, updating and deleting them when necessary.

    Args:
        request: The incoming eventarc request object.

    """
    print(f"EVENT::: path: {cloud_event}")
    firestore_payload = firestoredata.DocumentEventData()
    firestore_payload._pb.ParseFromString(cloud_event.data)

    document_value = None
    if firestore_payload.value:
        document_value = firestore_payload.value
    elif firestore_payload.old_value:
        document_value = firestore_payload.old_value
    path_parts = document_value.name.split("/")
    separator_idx = path_parts.index("documents")
    collection_path = path_parts[separator_idx + 1]
    document_path = "/".join(path_parts[(separator_idx + 2) :])
    job_name = document_path

    print(f"Collection path: {collection_path}")
    print(f"Document path: {document_path}")

    if determine_job_type(firestore_payload.old_value, firestore_payload.value) in ('CREATE','UPDATE'):
        crond_expression = firestore_payload.value.fields["crond_expression"].string_value
        validation_date_pattern = firestore_payload.value.fields["date_format"].string_value
        time_zone = firestore_payload.value.fields["time_zone"].string_value
        workflow_status = firestore_payload.value.fields["workflow_status"].string_value
        workflow_properties = firestore_payload.value.fields["workflow_properties"].string_value
        workflow_parameters = {
            "workflows_name" : job_name,
            "validation_date_pattern" : validation_date_pattern,
            "same_day_execution" : "YESTERDAY",
            "workflow_status" : workflow_status,
            "workflow_properties" : workflow_properties
        }
        if determine_job_type(firestore_payload.old_value, firestore_payload.value) == 'CREATE':
            create_job(job_name, crond_expression, time_zone, workflow_parameters)
        if determine_job_type(firestore_payload.old_value, firestore_payload.value) == 'UPDATE':
            update_job(job_name, crond_expression, time_zone, workflow_parameters)
    if determine_job_type(firestore_payload.old_value, firestore_payload.value) in ('CREATE','UPDATE'):
        change_status(job_name, firestore_payload.value)
    if determine_job_type(firestore_payload.old_value, firestore_payload.value) == 'DELETE':
        delete_job(job_name)