def airflow_task_build()

in dagify/converter/engine.py [0:0]


def airflow_task_build(task, template):
    # Load the Template Output Structure
    if template["structure"] is None:
        raise ValueError(
            f"dagify: no output structure in template: {template['metadata']['name']}, conversion will perform no action")

    if template["mappings"] is None:
        raise ValueError(
            f"dagify: no mappings in template: {template['metadata']['name']}, conversion will perform no action")

    # Declare Output Values Dictionary
    values = {}

    # Process each Mapping
    for mapping in template["mappings"]:
        # Lookup Mapping Target Key
        targetKey = mapping.get('target', None)
        if targetKey is None:
            # If Key is None, Skip
            continue

        # Load Target Value or Default Value for TargetKey from task source
        # field
        targetValue = task.get_attribute(mapping.get("source", ""))

        # Apply Rules
        # TODO: Handle Rules through additional function
        rules = mapping.get("rules", [])
        if rules is None:
            rules = []

        if len(rules) == 0:
            print("No Rules applied to source during mapping")

        for rule in rules:
            print(f"Apply Rule {rule.get('rule')}")
            r = Rule()
            args = [rule.get("rule"), targetValue]
            for arg in rule.get("args", []):
                args.append(arg)
            targetValue = r.run(args)

            # Update the Current Object Value
            task.set_attribute(mapping.get("source", ""), targetValue)

        if targetValue is None:
            # TODO - Log That we are going to use the defaults
            targetValue = mapping.get("default", None)
        if targetValue is None:
            # TODO - Log No Default Found, Handle with !UNKNOWN!!
            targetValue = "!!UNKNOWN!!"
        # Construct Values for Output!
        values[targetKey] = targetValue

    # Explicit Trigger Rule Handling
    # By default, the trigger rule will be all_success unless there is an in-condition with "OR" relationship
    trigger_rule = "all_success"
    for in_condition in task.get_in_conditions():
        if in_condition.get_attribute("AND_OR") == 'O':
            trigger_rule =  'one_success'
    values["trigger_rule"] = trigger_rule

    # Construct Output Python Object Text
    output = template["structure"].format(**values)
    return output