in dagify/converter/engine.py [0:0]
def airflow_task_build(task, template):
# Load the Template Output Structure
if template["structure"] is None:
raise ValueError(
f"dagify: no output structure in template: {template['metadata']['name']}, conversion will perform no action")
if template["mappings"] is None:
raise ValueError(
f"dagify: no mappings in template: {template['metadata']['name']}, conversion will perform no action")
# Declare Output Values Dictionary
values = {}
# Process each Mapping
for mapping in template["mappings"]:
# Lookup Mapping Target Key
targetKey = mapping.get('target', None)
if targetKey is None:
# If Key is None, Skip
continue
# Load Target Value or Default Value for TargetKey from task source
# field
targetValue = task.get_attribute(mapping.get("source", ""))
# Apply Rules
# TODO: Handle Rules through additional function
rules = mapping.get("rules", [])
if rules is None:
rules = []
if len(rules) == 0:
print("No Rules applied to source during mapping")
for rule in rules:
print(f"Apply Rule {rule.get('rule')}")
r = Rule()
args = [rule.get("rule"), targetValue]
for arg in rule.get("args", []):
args.append(arg)
targetValue = r.run(args)
# Update the Current Object Value
task.set_attribute(mapping.get("source", ""), targetValue)
if targetValue is None:
# TODO - Log That we are going to use the defaults
targetValue = mapping.get("default", None)
if targetValue is None:
# TODO - Log No Default Found, Handle with !UNKNOWN!!
targetValue = "!!UNKNOWN!!"
# Construct Values for Output!
values[targetKey] = targetValue
# Explicit Trigger Rule Handling
# By default, the trigger rule will be all_success unless there is an in-condition with "OR" relationship
trigger_rule = "all_success"
for in_condition in task.get_in_conditions():
if in_condition.get_attribute("AND_OR") == 'O':
trigger_rule = 'one_success'
values["trigger_rule"] = trigger_rule
# Construct Output Python Object Text
output = template["structure"].format(**values)
return output