in src/modules/get_pcmk_properties_db.py [0:0]
def parse_ha_cluster_config(self):
"""
Parse HA cluster configuration XML and return a list of properties.
"""
parameters = []
for scope in [
"rsc_defaults",
"crm_config",
"op_defaults",
"constraints",
"resources",
]:
if scope == "op_defaults" and self.os_type == "REDHAT":
continue
self.category = scope
root = self.parse_xml_output(self.execute_command_subprocess(CIB_ADMIN(scope=scope)))
if not root:
continue
if self.category in self.BASIC_CATEGORIES:
try:
xpath = self.BASIC_CATEGORIES[self.category][0]
for element in root.findall(xpath):
parameters.extend(self._parse_basic_config(element, self.category))
except Exception as ex:
self.result[
"message"
] += f"Failed to get {self.category} configuration: {str(ex)}"
continue
elif self.category == "resources":
try:
for sub_category, xpath in self.RESOURCE_CATEGORIES.items():
elements = root.findall(xpath)
for element in elements:
parameters.extend(self._parse_resource(element, sub_category))
except Exception as ex:
self.result[
"message"
] += f"Failed to get resources configuration for {self.category}: {str(ex)}"
continue
elif self.category == "constraints":
try:
parameters.extend(self._parse_constraints(root))
except Exception as ex:
self.result["message"] += f"Failed to get constraints configuration: {str(ex)}"
continue
try:
parameters.extend(self._parse_os_parameters())
except Exception as ex:
self.result["message"] += f"Failed to get OS parameters: {str(ex)} \n"
try:
parameters.extend(self._parse_global_ini_parameters())
except Exception as ex:
self.result["message"] += f"Failed to get global.ini parameters: {str(ex)} \n"
failed_parameters = [
param
for param in parameters
if param.get("status", TestStatus.ERROR.value) == TestStatus.ERROR.value
]
self.result.update(
{
"details": {"parameters": parameters},
"status": (
TestStatus.ERROR.value if failed_parameters else TestStatus.SUCCESS.value
),
}
)
self.result["message"] += "HA Parameter Validation completed successfully."