in source/generate_dag.py [0:0]
def import_python_functions(yaml_config: dict) -> dict:
"""
Generate a dictionary for importing Python functions into a DAG.
This function determines how to import Python functions based on the
provided YAML configuration. It can import functions from a specified
file, from the YAML configuration, or from both. It also checks if
'custom_defined_functions' exists and is not empty. It preserves
indentation from the YAML "code" blocks.
Args:
yaml_config (dict): YAML configuration for the task.
Returns:
dict: A dictionary containing:
- add_functions (bool): Flag indicating whether to add functions.
- functions (list): A list of function code strings with preserved indentation.
"""
functions = []
try:
custom_functions = yaml_config['custom_python_functions']
if not isinstance(custom_functions, dict):
raise TypeError("'custom_python_functions' should be a dictionary.")
if custom_functions.get('import_functions_from_file', False) and custom_functions.get('functions_file_path'):
file_path = custom_functions['functions_file_path']
print(f"Importing Python Functions: reading python functions from file {file_path}")
with open(file_path, 'r') as file:
functions.append(file.read())
defined_functions = custom_functions.get('custom_defined_functions')
if defined_functions:
for i, func_data in enumerate(defined_functions.values()):
if 'code' in func_data and func_data['code'].strip():
# Split the code into lines and preserve indentation
code_lines = func_data['code'].splitlines()
functions.append('\n'.join(code_lines)) # Join the lines back
except (KeyError, TypeError) as e:
print(f"Importing Python Functions: Error processing YAML: {e}")
if functions:
return {
"add_functions": True,
"functions": functions
}
else:
print("Importing Python Functions: Skipping function import.")
return {"add_functions": False}