in o2a/utils/config_extractors.py [0:0]
def extract_properties_from_job_xml_nodes(job_xml_nodes: List[ET.Element], input_directory_path: str):
"""Extracts configuration properties from ``job_xml`` nodes"""
properties_dict: Dict[str, str] = dict()
for xml_file in job_xml_nodes:
file_name = xml_file.text
if not file_name:
raise ParseException(
'Element "job-xml" should have content, however its value is empty. Make sure the element '
"has the correct content."
)
file_path = path.join(input_directory_path, HDFS_FOLDER, file_name)
config_tree = ET.parse(file_path)
config_node = config_tree.getroot()
if not config_node:
raise ParseException(
"A job-xml configuration node is specified in the workflow XML, however its value is empty."
"Make sure the path to a configuration file is valid."
)
new_properties = extract_properties_from_configuration_node(config_node)
properties_dict.update(new_properties)
return properties_dict