in tools/airpiler/airpiler.py [0:0]
def visit_expr(self, _node, visited_children):
""" Returns the overall output. """
value = None
key = None
def setup_default_args():
""" Add the default args section to the result dictionary"""
default_arg_keys = ["owner"]
if "default_args" not in value.keys():
value["default_args"] = dict()
value["default_args"]["start_date"] = "1 days"
for def_arg in default_arg_keys:
if def_arg in value.keys():
value["default_args"][def_arg] = value[def_arg]
del value[def_arg]
def setup_task_group():
""" Adds a taskgroup section"""
if not value.get("task_groups"):
value["task_groups"] = dict()
if value.get("description"):
value["task_groups"][f'task_group_{key}'] = {
"tooltip": value["description"]
}
else:
value["task_groups"][f'task_group_{key}'] = {"tooltip": key}
# This creates a dependency if you have a nested box within a box
if value.get("box_name"):
dependency = value.get("box_name")
for val in result.values():
if val.get('task_groups'):
if val['task_groups'].get(f"task_group_{dependency}"):
if value["task_groups"][f'task_group_{key}'].\
get('dependencies'):
value["task_groups"][f'task_group_{key}']['dependencies'].\
append(f"task_group_{dependency}")
else:
value["task_groups"][f'task_group_{key}'].\
update({'dependencies': [f"task_group_{dependency}"]})
# check if a condition statement exists to set it as a dependency
if value.get("condition"):
create_dependencies()
def setup_task():
""" Adds a task section"""
if not value.get("tasks"):
value.update({"tasks": {f'task_{key}': dict()}})
cmd_dict = {
"operator": "airflow.operators.bash_operator.BashOperator",
"bash_command": f'echo [{value["command"]}]'
}
value["tasks"][f'task_{key}'].update(cmd_dict)
# check if a condition statement exists to set it as a dependency
if value.get("condition"):
create_dependencies()
if value.get("box_name"):
value["tasks"][f'task_{key}']["task_group_name"] = \
f'task_group_{value.get("box_name")}'
# tasks can't have descriptions only dags/top level boxes can
if value.get("description"):
del value["description"]
# clean up the converted field
del value["command"]
def create_dependencies():
""" Converts condition statement to dependencies"""
condition_pattern = r"s\((\w+)\)"
mat = re.findall(condition_pattern, value["condition"])
if mat:
for dep in mat:
for val in result.values():
# check if the dependency is one of the tasks
if val.get('tasks'):
if val['tasks'].get(f"task_{dep}"):
if value["tasks"][f'task_{key}'].get(
'dependencies'):
value["tasks"][f'task_{key}']['dependencies'].\
append(f"task_{dep}")
else:
value["tasks"][f'task_{key}'].\
update({'dependencies': [f"task_{dep}"]})
# check if the dependency is one of the tasksgroups
if val.get('task_groups'):
if val['task_groups'].get(f"task_group_{dep}"):
if value["task_groups"][
f'task_group_{key}'].get(
'dependencies'):
value["task_groups"][f'task_group_{key}']['dependencies'].\
append(f"task_group_{dep}")
else:
value["task_groups"][f'task_group_{key}'].\
update({'dependencies': [f"task_group_{dep}"]})
# clean up the converted field
del value["condition"]
# create the result dictionary
result = {}
for child in visited_children:
for key, value in child[0].items():
## Convert top level Box to DAG
if value['job_type'] == "box" and not value.get("box_name"):
setup_default_args()
setup_task_group()
# Clean Up
if value.get("description"):
del value["description"]
del value["job_type"]
result[f"{key}_DAG"] = value
## Convert Box inside a box into a TaskGroup
elif value['job_type'] == "box" and value.get('box_name'):
dag_name = list(result.keys())[0]
setup_task_group()
result[dag_name]["task_groups"].update(value["task_groups"])
## Convert Commands inside Boxes into Tasks of the TaskGroups
elif value.get("box_name") and value['job_type'] == "cmd":
dag_name = list(result.keys())[0]
setup_task()
if result[dag_name].get("tasks"):
result[dag_name]["tasks"].update(value["tasks"])
else:
result[dag_name]["tasks"] = value["tasks"]
# clean up
del value["box_name"]
del value["job_type"]
if value["owner"]:
del value["owner"]
## Convert Stand Alone Commands into a DAG
elif 'box_name' not in value.keys(
) and value['job_type'] == "cmd":
# Populate the Default Args
setup_default_args()
# Populate the Task
value['tasks'] = {
f"task_cmd_{key}": {
"operator":
"airflow.operators.bash_operator.BashOperator",
"bash_command":
f'echo [{value["command"]}]'
}
}
if value.get("condition"):
create_dependencies()
# Clean Up
del value["command"]
del value["job_type"]
result[f"{key}"] = value
return result