def index()

in src/translation/event_listener/main.py [0:0]


def index():
    valid_subfolders_list = ["ddl", "dml", "sql", "data"]

    if not request.is_json:
        print("pushed message without body")
        return errors.NO_BODY
    else:
        event_json = request.get_json()
        print(f"event_json: {event_json}")

    if not event_json:
        print("pushed message without body")
        return errors.NO_BODY

    if not isinstance(event_json, dict):
        print("invalid message format")
        return errors.INVALID_MESSAGE_FORMAT

    if "subscription" not in event_json or "message" not in event_json:
        print("pushed message must contain subscription and message field")
        return errors.SUBSCRIPTION_AND_MESSAGE_REQUIRED

    subscription = event_json["subscription"]
    next_dag_id = routing.get_dag_id(subscription)

    if not next_dag_id:
        print(f"no routing rule found to handle message from {subscription}")
        return errors.NO_ROUTING_RULE_FOUND

    event_type = event_json["message"]["attributes"]["eventType"]
    print(f"event type: {event_type}")

    if event_type == "OBJECT_FINALIZE":
        object_id = event_json["message"]["attributes"]["objectId"]
        subfolder = object_id.split("/")[0]
        file_name = os.path.basename(object_id)
        file_extension = file_name.split(".")[-1]
        if subfolder not in valid_subfolders_list or file_extension != "json":
            print(
                "Please ensure you upload only JSON configuration files to trigger DMT workflow in the folders config/[ddl, dml, sql,data] paths only"
            )
            print(
                f"File with extension: {file_extension} uploaded in subfolder: {subfolder}"
            )
            return errors.INVALID_FORMAT_OR_SUBFOLDER_FOUND

    dag_config = event_json
    status = composer.run_dag(routing.get_composer_env_url(), next_dag_id, dag_config)
    print(f"composer dag return status: {status}")

    if status >= 200 and status < 300:
        return {}, HTTPStatus.OK
    elif status >= 400 and status < 500:
        return {}, HTTPStatus.BAD_REQUEST
    elif status >= 500 and status < 600:
        return {}, HTTPStatus.BAD_GATEWAY