def visit_expr()

in tools/airpiler/airpiler.py [0:0]


    def visit_expr(self, _node, visited_children):
        """ Returns the overall output. """
        value = None
        key = None

        def setup_default_args():
            """ Add the default args section to the result dictionary"""
            default_arg_keys = ["owner"]
            if "default_args" not in value.keys():
                value["default_args"] = dict()
                value["default_args"]["start_date"] = "1 days"
            for def_arg in default_arg_keys:
                if def_arg in value.keys():
                    value["default_args"][def_arg] = value[def_arg]
                    del value[def_arg]

        def setup_task_group():
            """ Adds a taskgroup section"""
            if not value.get("task_groups"):
                value["task_groups"] = dict()

            if value.get("description"):
                value["task_groups"][f'task_group_{key}'] = {
                    "tooltip": value["description"]
                }
            else:
                value["task_groups"][f'task_group_{key}'] = {"tooltip": key}

            # This creates a dependency if you have a nested box within a box
            if value.get("box_name"):
                dependency = value.get("box_name")
                for val in result.values():
                    if val.get('task_groups'):
                        if val['task_groups'].get(f"task_group_{dependency}"):
                            if value["task_groups"][f'task_group_{key}'].\
                                get('dependencies'):
                                value["task_groups"][f'task_group_{key}']['dependencies'].\
                                    append(f"task_group_{dependency}")
                            else:
                                value["task_groups"][f'task_group_{key}'].\
                                    update({'dependencies': [f"task_group_{dependency}"]})

            # check if a condition statement exists to set it as a dependency
            if value.get("condition"):
                create_dependencies()

        def setup_task():
            """ Adds a task section"""
            if not value.get("tasks"):
                value.update({"tasks": {f'task_{key}': dict()}})

            cmd_dict = {
                "operator": "airflow.operators.bash_operator.BashOperator",
                "bash_command": f'echo [{value["command"]}]'
            }
            value["tasks"][f'task_{key}'].update(cmd_dict)

            # check if a condition statement exists to set it as a dependency
            if value.get("condition"):
                create_dependencies()

            if value.get("box_name"):
                value["tasks"][f'task_{key}']["task_group_name"] = \
                    f'task_group_{value.get("box_name")}'

            # tasks can't have descriptions only dags/top level boxes can
            if value.get("description"):
                del value["description"]

            # clean up the converted field
            del value["command"]

        def create_dependencies():
            """ Converts condition statement to dependencies"""
            condition_pattern = r"s\((\w+)\)"
            mat = re.findall(condition_pattern, value["condition"])
            if mat:
                for dep in mat:
                    for val in result.values():
                        # check if the dependency is one of the tasks
                        if val.get('tasks'):
                            if val['tasks'].get(f"task_{dep}"):
                                if value["tasks"][f'task_{key}'].get(
                                        'dependencies'):
                                    value["tasks"][f'task_{key}']['dependencies'].\
                                        append(f"task_{dep}")
                                else:
                                    value["tasks"][f'task_{key}'].\
                                       update({'dependencies': [f"task_{dep}"]})

                        # check if the dependency is one of the tasksgroups
                        if val.get('task_groups'):
                            if val['task_groups'].get(f"task_group_{dep}"):
                                if value["task_groups"][
                                        f'task_group_{key}'].get(
                                            'dependencies'):
                                    value["task_groups"][f'task_group_{key}']['dependencies'].\
                                        append(f"task_group_{dep}")
                                else:
                                    value["task_groups"][f'task_group_{key}'].\
                                        update({'dependencies': [f"task_group_{dep}"]})

                # clean up the converted field
                del value["condition"]

        # create the result dictionary
        result = {}
        for child in visited_children:
            for key, value in child[0].items():
                ## Convert top level Box to DAG
                if value['job_type'] == "box" and not value.get("box_name"):

                    setup_default_args()
                    setup_task_group()

                    # Clean Up
                    if value.get("description"):
                        del value["description"]
                    del value["job_type"]

                    result[f"{key}_DAG"] = value
                ## Convert Box inside a box into a TaskGroup
                elif value['job_type'] == "box" and value.get('box_name'):
                    dag_name = list(result.keys())[0]
                    setup_task_group()
                    result[dag_name]["task_groups"].update(value["task_groups"])

                ## Convert Commands inside Boxes into Tasks of the TaskGroups
                elif value.get("box_name") and value['job_type'] == "cmd":
                    dag_name = list(result.keys())[0]
                    setup_task()
                    if result[dag_name].get("tasks"):
                        result[dag_name]["tasks"].update(value["tasks"])
                    else:
                        result[dag_name]["tasks"] = value["tasks"]

                    # clean up
                    del value["box_name"]
                    del value["job_type"]
                    if value["owner"]:
                        del value["owner"]

                ## Convert Stand Alone Commands into a DAG
                elif 'box_name' not in value.keys(
                ) and value['job_type'] == "cmd":
                    # Populate the Default Args
                    setup_default_args()

                    # Populate the Task
                    value['tasks'] = {
                        f"task_cmd_{key}": {
                            "operator":
                                "airflow.operators.bash_operator.BashOperator",
                            "bash_command":
                                f'echo [{value["command"]}]'
                        }
                    }

                    if value.get("condition"):
                        create_dependencies()

                    # Clean Up
                    del value["command"]
                    del value["job_type"]
                    result[f"{key}"] = value

        return result