src/responsibleai/rai_analyse/rai_component_utilities.py (477 lines of code) (raw):

# Copyright (c) Microsoft Corporation. # Licensed under the MIT License. import json import logging import os import pathlib import re import shutil import subprocess import sys import tempfile import time import traceback import uuid from typing import Any, Dict, Optional import mlflow import mltable import pandas as pd import requests from arg_helpers import get_from_args from azureml.core import Model, Run, Workspace # TODO: seems this method needs to be made public from azureml.rai.utils.telemetry.loggerfactory import _extract_and_filter_stack from constants import (MLFLOW_MODEL_SERVER_PORT, DashboardInfo, PropertyKeyValues, RAIToolType) from raiutils.common.retries import retry_function from raiutils.exceptions import UserConfigValidationException from responsibleai._internal._served_model_wrapper import ServedModelWrapper from responsibleai.feature_metadata import FeatureMetadata from responsibleai import RAIInsights from responsibleai import __version__ as responsibleai_version assetid_re = re.compile( r"azureml://locations/(?P<location>.*)/workspaces/(?P<workspaceid>.*)/(?P<assettype>.*)/(?P<assetname>.*)/versions/(?P<assetversion>.*)" # noqa: E501 ) data_type = "data_type" _logger = logging.getLogger(__file__) logging.basicConfig(level=logging.INFO) # Directory names saved by RAIInsights might not match tool names _tool_directory_mapping: Dict[str, str] = { RAIToolType.CAUSAL: "causal", RAIToolType.COUNTERFACTUAL: "counterfactual", RAIToolType.ERROR_ANALYSIS: "error_analysis", RAIToolType.EXPLANATION: "explainer", } class UserConfigError(Exception): def __init__(self, message, cause=None): if cause: self.tb = _extract_and_filter_stack(cause, traceback.extract_tb(sys.exc_info()[2])) self.cause = cause super().__init__(message) def print_dir_tree(base_dir): print("\nBEGIN DIRTREE") for current_dir, subdirs, files in os.walk(base_dir): # Current Iteration Directory print(current_dir) # Directories for dirname in sorted(subdirs): print("\t" + dirname + "/") # Files for filename in sorted(files): print("\t" + filename) print("END DIRTREE\n", flush=True) def fetch_model_id(model_info_path: str): model_info_path = os.path.join(model_info_path, DashboardInfo.MODEL_INFO_FILENAME) try: json_file = open(model_info_path, "r") except Exception: raise UserConfigValidationException( f"Failed to open {model_info_path}. Please ensure the model path is correct." ) model_info = json.load(json_file) json_file.close() if DashboardInfo.MODEL_ID_KEY not in model_info: raise UserConfigValidationException( f"Invalid input, expecting key {DashboardInfo.MODEL_ID_KEY} to exist in the input json" ) else: return model_info[DashboardInfo.MODEL_ID_KEY] class InstallCondaEnv(object): def __init__(self, use_separate_conda_env: bool, conda_file: str, model_path: str, model_id: str = None, model: Model = None): self.use_separate_conda_env = use_separate_conda_env self.conda_file = conda_file self.model_path = model_path self.model_id = model_id self.model = model def install(self): try: if self.use_separate_conda_env: # generate some random characters to add to model path random_chars = str(uuid.uuid4())[:8] tmp_model_path = "./mlflow_model" + random_chars model_path = self.model_path if (not model_path and self.model_id): model_path = Model.get_model_path(model_name=self.model.name, version=self.model.version) shutil.copytree(model_path, tmp_model_path) model_uri = tmp_model_path _logger.info("MODEL URI: {}".format( model_uri )) for root, _, files in os.walk(model_uri): for f in files: full_path = os.path.join(root, f) _logger.info("FILE: {}".format( full_path )) conda_install_command = ["mlflow", "models", "prepare-env", "-m", model_uri, "--env-manager", "conda"] else: # mlflow model input mount as read only. Conda need write access. local_conda_dep = "./conda_dep.yaml" shutil.copyfile(self.conda_file, local_conda_dep) conda_prefix = str(pathlib.Path(sys.executable).parents[1]) conda_install_command = ["conda", "env", "update", "--prefix", conda_prefix, "-f", local_conda_dep] install_log = subprocess.check_output(conda_install_command) _logger.info( "Conda dependency installation successful, logs: {}".format( install_log ) ) except subprocess.CalledProcessError as e: _logger.error( "Installing dependency using conda.yaml from mlflow model failed: {}".format( e.output ) ) _classify_and_log_pip_install_error(e.output) raise e return def load_mlflow_model( workspace: Workspace, use_model_dependency: bool = False, use_separate_conda_env: bool = False, model_id: Optional[str] = None, model_path: Optional[str] = None, ) -> Any: model_uri = model_path mlflow.set_tracking_uri(workspace.get_mlflow_tracking_uri()) model = None if model_id: try: model = Model._get(workspace, id=model_id) except Exception as e: raise UserConfigError( "Unable to retrieve model by model id {} in workspace {}, error:\n{}".format( model_id, workspace.name, e ), e ) model_uri = "models:/{}/{}".format(model.name, model.version) if use_model_dependency: conda_file = None if not use_separate_conda_env: try: conda_file = mlflow.pyfunc.get_model_dependencies(model_uri, format="conda") except Exception as e: raise UserConfigError( "Failed to get model dependency from given model {}, error:\n{}".format( model_uri, e ), e ) try: installer = InstallCondaEnv( use_separate_conda_env, conda_file, model_path, model_id, model) action_name = "Install conda" err_msg = "Failed to install conda" max_retries = 3 retry_delay = 60 retry_function(installer.install, action_name, err_msg, max_retries=max_retries, retry_delay=retry_delay) except RuntimeError: raise UserConfigValidationException( "Installing dependency using conda environment spec from mlflow model failed. " "This behavior can be turned off with setting use_model_dependency to False in job spec. " "You may also check error log above to manually resolve package conflict error" ) _logger.info("Successfully installed model dependencies") try: if not use_separate_conda_env: model = mlflow.pyfunc.load_model(model_uri)._model_impl return model # Serve model from separate conda env using mlflow mlflow_models_serve_logfile_name = "./logs/azureml/mlflow_models_serve.log" try: # run mlflow model server in background with open(mlflow_models_serve_logfile_name, "w") as logfile: model_serving_log = subprocess.Popen( [ "mlflow", "models", "serve", "-m", model_uri, "--env-manager", "conda", "-p", str(MLFLOW_MODEL_SERVER_PORT) ], close_fds=True, stdout=logfile, stderr=logfile ) _logger.info("Started mlflow model server process in the background") except subprocess.CalledProcessError as e: _logger.error( f"Running mlflow models serve in the background failed: {e.output}" ) _classify_and_log_pip_install_error(e.output) raise RuntimeError( "Starting the mlflow model server failed." ) # If the server started successfully then the logfile should contain a line # saying "Listening at: http" # If not, it could either take more time to start or it failed to start. # We can check if the process ended by calling poll() on it. # Otherwise, we wait a predefined time and check again. for _ in range(10): with open(mlflow_models_serve_logfile_name, "r") as logfile: logs = logfile.read() if "Listening at: http" not in logs: if model_serving_log.poll() is not None: # process ended raise RuntimeError( f"Unable to start mlflow model server: {logs}" ) # process still running, wait and try again... else: try: # attempt to contact mlflow model server # if the response is a 500 (due to missing body) then the server is up # if it's a 404 then the server is just starting up and we need to wait test_response = requests.post(f"http://localhost:{MLFLOW_MODEL_SERVER_PORT}/invocations") if test_response.status_code == 500: break except Exception as e: _logger.info( "Waiting for mlflow model server to start, error: {}".format( e ) ) time.sleep(5) else: raise RuntimeError( "Unable to start mlflow model server." ) _logger.info("Successfully started mlflow model server.") model = ServedModelWrapper(port=MLFLOW_MODEL_SERVER_PORT) _logger.info("Successfully loaded model.") return model except Exception as e: raise UserConfigError( "Unable to load mlflow model from {} in current environment due to error:\n{}".format( model_uri, e ), e ) def _classify_and_log_pip_install_error(elog): ret_message = [] if elog is None: return ret_message if b"Could not find a version that satisfies the requirement" in elog: ret_message.append("Detected unsatisfiable version requirment.") if b"package versions have conflicting dependencies" in elog: ret_message.append("Detected dependency conflict error.") for m in ret_message: _logger.warning(m) return ret_message def load_mltable(mltable_path: str) -> pd.DataFrame: _logger.info(f"Attempting to load {mltable_path} as MLTable") try: assetid_path = os.path.join(mltable_path, "assetid") if os.path.exists(assetid_path): with open(assetid_path, "r") as assetid_file: mltable_path = assetid_file.read() tbl = mltable.load(mltable_path) df: pd.DataFrame = tbl.to_pandas_dataframe() except Exception as e: _logger.info(f"Failed to load {mltable_path} as MLTable. ") raise e return df def load_parquet(parquet_path: str) -> pd.DataFrame: _logger.info(f"Attempting to load {parquet_path} as parquet dataset") try: df = pd.read_parquet(parquet_path) except Exception as e: _logger.info(f"Failed to load {parquet_path} as MLTable. ") raise e return df def load_dataset(dataset_path: str) -> pd.DataFrame: _logger.info(f"Attempting to load: {dataset_path}") exceptions = [] isLoadSuccessful = False try: df = load_mltable(dataset_path) isLoadSuccessful = True except Exception as e: new_e = UserConfigError( f"Input dataset {dataset_path} cannot be read as mltable." f"You may disregard this error if dataset input is intended to be parquet dataset. Exception: {e}", e ) exceptions.append(new_e) if not isLoadSuccessful: try: df = load_parquet(dataset_path) isLoadSuccessful = True except Exception as e: new_e = UserConfigError( f"Input dataset {dataset_path} cannot be read as parquet." f"You may disregard this error if dataset input is intended to be mltable. Exception: {e}", e ) exceptions.append(new_e) if not isLoadSuccessful: raise UserConfigError( f"Input dataset {dataset_path} cannot be read as MLTable or Parquet dataset." f"Please check that input dataset is valid. Exceptions encountered during reading: {exceptions}" ) print(df.dtypes) print(df.head(10)) return df def load_dashboard_info_file(input_port_path: str) -> Dict[str, str]: # Load the rai_insights_dashboard file info rai_insights_dashboard_file = os.path.join( input_port_path, DashboardInfo.RAI_INSIGHTS_PARENT_FILENAME ) with open(rai_insights_dashboard_file, "r") as si: dashboard_info = json.load(si, object_hook=default_object_hook) _logger.info("rai_insights_parent info: {0}".format(dashboard_info)) return dashboard_info def copy_dashboard_info_file(src_port_path: str, dst_port_path: str): src = pathlib.Path(src_port_path) / DashboardInfo.RAI_INSIGHTS_PARENT_FILENAME dst = pathlib.Path(dst_port_path) / DashboardInfo.RAI_INSIGHTS_PARENT_FILENAME shutil.copyfile(src, dst) def create_rai_tool_directories(rai_insights_dir: pathlib.Path) -> None: # Have to create empty subdirectories for the managers # THe RAI Insights object expect these to be present, but # since directories don't actually exist in Azure Blob store # they may not be present (some of the tools always have # a file present, even if no tool instances have been added) for v in _tool_directory_mapping.values(): os.makedirs(rai_insights_dir / v, exist_ok=True) _logger.info("Added empty directories") def load_rai_insights_from_input_port(input_port_path: str) -> RAIInsights: with tempfile.TemporaryDirectory() as incoming_temp_dir: incoming_dir = pathlib.Path(incoming_temp_dir) shutil.copytree(input_port_path, incoming_dir, dirs_exist_ok=True) _logger.info("Copied RAI Insights input to temporary directory") create_rai_tool_directories(incoming_dir) result = RAIInsights.load(incoming_dir) _logger.info("Loaded RAIInsights object") return result def copy_insight_to_raiinsights( rai_insights_dir: pathlib.Path, insight_dir: pathlib.Path ) -> str: print("Starting copy") # Recall that we copy the JSON containing metadata from the # constructor component into each directory # This means we have that file and the results directory # present in the insight_dir dir_items = list(insight_dir.iterdir()) assert len(dir_items) == 2 # We want the directory, not the JSON file if dir_items[0].name == DashboardInfo.RAI_INSIGHTS_PARENT_FILENAME: tool_dir_name = dir_items[1].name else: tool_dir_name = dir_items[0].name _logger.info("Detected tool: {0}".format(tool_dir_name)) assert tool_dir_name in _tool_directory_mapping.values() for k, v in _tool_directory_mapping.items(): if tool_dir_name == v: tool_type = k _logger.info("Mapped to tool: {0}".format(tool_type)) tool_dir = insight_dir / tool_dir_name tool_dir_items = list(tool_dir.iterdir()) assert len(tool_dir_items) == 1 if tool_type == RAIToolType.EXPLANATION: # Explanations will have a directory already present for some reason # Furthermore we only support one explanation per dashboard for # some other reason # Put together, if we have an explanation, we need to remove # what's there already or we can get confused _logger.info("Detected explanation, removing existing directory") for item in (rai_insights_dir / tool_dir_name).iterdir(): _logger.info("Removing directory {0}".format(str(item))) shutil.rmtree(item) src_dir = insight_dir / tool_dir_name / tool_dir_items[0].parts[-1] dst_dir = rai_insights_dir / tool_dir_name / tool_dir_items[0].parts[-1] shutil.copytree( src=src_dir, dst=dst_dir, ) _logger.info("Copy complete") return tool_type def save_to_output_port(rai_i: RAIInsights, output_port_path: str, tool_type: str): with tempfile.TemporaryDirectory() as tmpdirname: rai_i.save(tmpdirname) _logger.info(f"Saved to {tmpdirname}") tool_dir_name = _tool_directory_mapping[tool_type] insight_dirs = os.listdir(pathlib.Path(tmpdirname) / tool_dir_name) assert len(insight_dirs) == 1, "Checking for exactly one tool output" _logger.info("Checking dirname is GUID") uuid.UUID(insight_dirs[0]) target_path = pathlib.Path(output_port_path) / tool_dir_name target_path.mkdir() _logger.info("Created output directory") _logger.info("Starting copy") shutil.copytree( pathlib.Path(tmpdirname) / tool_dir_name, target_path, dirs_exist_ok=True, ) _logger.info("Copied to output") def add_properties_to_gather_run( dashboard_info: Dict[str, str], tool_present_dict: Dict[str, str] ): _logger.info("Adding properties to the gather run") gather_run = Run.get_context() run_properties = { PropertyKeyValues.RAI_INSIGHTS_TYPE_KEY: PropertyKeyValues.RAI_INSIGHTS_TYPE_GATHER, PropertyKeyValues.RAI_INSIGHTS_DASHBOARD_ID_KEY: dashboard_info[ DashboardInfo.RAI_INSIGHTS_RUN_ID_KEY ], PropertyKeyValues.RAI_INSIGHTS_RESPONSIBLEAI_VERSION_KEY: responsibleai_version, PropertyKeyValues.RAI_INSIGHTS_MODEL_ID_KEY: dashboard_info[ DashboardInfo.RAI_INSIGHTS_MODEL_ID_KEY ], PropertyKeyValues.RAI_INSIGHTS_TEST_DATASET_ID_KEY: dashboard_info[ DashboardInfo.RAI_INSIGHTS_TEST_DATASET_ID_KEY ], PropertyKeyValues.RAI_INSIGHTS_TRAIN_DATASET_ID_KEY: dashboard_info[ DashboardInfo.RAI_INSIGHTS_TRAIN_DATASET_ID_KEY ], PropertyKeyValues.RAI_INSIGHTS_DASHBOARD_TITLE_KEY: dashboard_info[ DashboardInfo.RAI_INSIGHTS_DASHBOARD_TITLE_KEY ], } _logger.info("Appending tool present information") for k, v in tool_present_dict.items(): key = PropertyKeyValues.RAI_INSIGHTS_TOOL_KEY_FORMAT.format(k) run_properties[key] = str(v) _logger.info("Making service call") gather_run.add_properties(run_properties) _logger.info("Properties added to gather run") def create_rai_insights_from_port_path(my_run: Run, port_path: str) -> RAIInsights: _logger.info("Creating RAIInsights from constructor component output") _logger.info("Loading data files") df_train = load_dataset(os.path.join(port_path, DashboardInfo.TRAIN_FILES_DIR)) df_test = load_dataset(os.path.join(port_path, DashboardInfo.TEST_FILES_DIR)) _logger.info("Loading config file") config = load_dashboard_info_file(port_path) constructor_args = config[DashboardInfo.RAI_INSIGHTS_CONSTRUCTOR_ARGS_KEY] _logger.info(f"Constuctor args: {constructor_args}") _logger.info("Loading model") input_args = config[DashboardInfo.RAI_INSIGHTS_INPUT_ARGS_KEY] use_model_dependency = input_args["use_model_dependency"] model_id = config[DashboardInfo.RAI_INSIGHTS_MODEL_ID_KEY] _logger.info("Loading model: {0}".format(model_id)) # For now, the separate conda env will only be used for forecasting. # At a later point, we might enable this for all task types. use_separate_conda_env = False if "task_type" in constructor_args: is_forecasting_task = constructor_args["task_type"] == "forecasting" use_separate_conda_env = is_forecasting_task constructor_args["forecasting_enabled"] = is_forecasting_task model_estimator = load_mlflow_model( workspace=my_run.experiment.workspace, use_model_dependency=use_model_dependency, use_separate_conda_env=use_separate_conda_env, model_id=model_id, ) # unwrap the model if it's an sklearn wrapper if model_estimator.__class__.__name__ == "_SklearnModelWrapper": model_estimator = model_estimator.sklearn_model _logger.info("Creating RAIInsights object") rai_i = RAIInsights( model=model_estimator, train=df_train, test=df_test, **constructor_args ) return rai_i def get_run_input_assets(run): return run.get_details()["runDefinition"]["inputAssets"] def get_asset_information(assetid): match = assetid_re.match(assetid) return match.groupdict() def get_train_dataset_id(run): return get_dataset_name_version(run, "train_dataset") def get_test_dataset_id(run): return get_dataset_name_version(run, "test_dataset") def get_dataset_name_version(run, dataset_input_name): aid = get_run_input_assets(run)[dataset_input_name]["asset"]["assetId"] ainfo = get_asset_information(aid) return f'{ainfo["assetname"]}:{ainfo["assetversion"]}' def default_json_handler(data): if isinstance(data, FeatureMetadata): meta_dict = data.__dict__ type_name = type(data).__name__ meta_dict[data_type] = type_name return meta_dict return None def default_object_hook(dict): if data_type in dict and dict[data_type] == FeatureMetadata.__name__: del dict[data_type] return FeatureMetadata(**dict) return dict def get_arg(args, arg_name: str, custom_parser, allow_none: bool) -> Any: try: return get_from_args(args, arg_name, custom_parser, allow_none) except ValueError as e: raise UserConfigError( f"Unable to parse {arg_name} from {args}." f"Please check that {args} is valid input and that {arg_name} exists." "For example, a json string with unquoted string value or key can cause this error." f"Raw parsing error: {e}" )