airavata-api/airavata-client-sdks/airavata-python-sdk/airavata_experiments/airavata.py (579 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import logging from pathlib import Path from typing import Literal, NamedTuple from airavata_sdk.transport.settings import APIServerSettings from .sftp import SFTPConnector import time import warnings import requests from urllib.parse import urlparse import uuid import os import base64 import jwt from airavata.model.security.ttypes import AuthzToken from airavata.model.experiment.ttypes import ExperimentModel, ExperimentType, UserConfigurationDataModel from airavata.model.scheduling.ttypes import ComputationalResourceSchedulingModel from airavata.model.data.replica.ttypes import DataProductModel, DataProductType, DataReplicaLocationModel, ReplicaLocationCategory from airavata_sdk.clients.api_server_client import APIServerClient warnings.filterwarnings("ignore", category=DeprecationWarning) logger = logging.getLogger("airavata_sdk.clients") logger.setLevel(logging.INFO) LaunchState = NamedTuple("LaunchState", [ ("experiment_id", str), ("agent_ref", str), ("process_id", str), ("mount_point", Path), ("experiment_dir", str), ("sr_host", str), ]) class Settings: def __init__(self, config_path: str) -> None: import configparser config = configparser.ConfigParser() config.read(config_path) # api server client settings self.API_SERVER_HOST = config.get('APIServer', 'API_HOST') self.API_SERVER_PORT = config.getint('APIServer', 'API_PORT') self.API_SERVER_SECURE = config.getboolean('APIServer', 'API_SECURE') self.CONNECTION_SVC_URL = config.get('APIServer', 'CONNECTION_SVC_URL') self.FILEMGR_SVC_URL = config.get('APIServer', 'FILEMGR_SVC_URL') # gateway settings self.GATEWAY_ID = config.get('Gateway', 'GATEWAY_ID') self.GATEWAY_URL = config.get('Gateway', 'GATEWAY_URL') self.GATEWAY_DATA_STORE_DIR = config.get('Gateway', 'GATEWAY_DATA_STORE_DIR') self.STORAGE_RESOURCE_HOST = config.get('Gateway', 'STORAGE_RESOURCE_HOST') self.SFTP_PORT = config.get('Gateway', 'SFTP_PORT') class AiravataOperator: def register_input_file( self, file_identifier: str, storage_name: str, storageId: str, gateway_id: str, input_file_name: str, uploaded_storage_path: str, ) -> str: dataProductModel = DataProductModel( gatewayId=gateway_id, ownerName=self.user_id, productName=file_identifier, dataProductType=DataProductType.FILE, replicaLocations=[ DataReplicaLocationModel( replicaName="{} gateway data store copy".format(input_file_name), replicaLocationCategory=ReplicaLocationCategory.GATEWAY_DATA_STORE, storageResourceId=storageId, filePath="file://{}:{}".format(storage_name, uploaded_storage_path + input_file_name), )], ) return self.api_server_client.register_data_product(self.airavata_token, dataProductModel) # type: ignore def create_experiment_model( self, project_name: str, application_name: str, experiment_name: str, description: str, gateway_id: str, ) -> ExperimentModel: execution_id = self.get_app_interface_id(application_name) project_id = self.get_project_id(project_name) return ExperimentModel( experimentName=experiment_name, gatewayId=gateway_id, userName=self.user_id, description=description, projectId=project_id, experimentType=ExperimentType.SINGLE_APPLICATION, executionId=execution_id ) def get_resource_host_id(self, resource_name): resources: dict = self.api_server_client.get_all_compute_resource_names(self.airavata_token) # type: ignore return next((str(k) for k, v in resources.items() if v == resource_name)) def configure_computation_resource_scheduling( self, experiment_model: ExperimentModel, computation_resource_name: str, group: str, storageId: str, node_count: int, total_cpu_count: int, queue_name: str, wall_time_limit: int, experiment_dir_path: str, auto_schedule=False, ) -> ExperimentModel: resource_host_id = self.get_resource_host_id(computation_resource_name) groupResourceProfileId = self.get_group_resource_profile_id(group) computRes = ComputationalResourceSchedulingModel() computRes.resourceHostId = resource_host_id computRes.nodeCount = node_count computRes.totalCPUCount = total_cpu_count computRes.queueName = queue_name computRes.wallTimeLimit = wall_time_limit userConfigData = UserConfigurationDataModel() userConfigData.computationalResourceScheduling = computRes userConfigData.groupResourceProfileId = groupResourceProfileId userConfigData.storageId = storageId userConfigData.experimentDataDir = experiment_dir_path userConfigData.airavataAutoSchedule = auto_schedule experiment_model.userConfigurationData = userConfigData return experiment_model def __init__(self, access_token: str, config_file: str = "settings.ini"): # store variables self.access_token = access_token self.settings = Settings(config_file) # load api server settings and create client api_server_settings = APIServerSettings(config_file) self.api_server_client = APIServerClient(api_server_settings=api_server_settings) # load gateway settings gateway_id = self.default_gateway_id() self.airavata_token = self.__airavata_token__(self.access_token, gateway_id) def default_gateway_id(self): return self.settings.GATEWAY_ID def default_gateway_data_store_dir(self): return self.settings.GATEWAY_DATA_STORE_DIR def default_sftp_port(self): return self.settings.SFTP_PORT def default_sr_hostname(self): return self.settings.STORAGE_RESOURCE_HOST def connection_svc_url(self): return self.settings.CONNECTION_SVC_URL def filemgr_svc_url(self): return self.settings.FILEMGR_SVC_URL def __airavata_token__(self, access_token: str, gateway_id: str): """ Decode access token (string) and create AuthzToken (object) """ decode = jwt.decode(access_token, options={"verify_signature": False}) self.user_id = str(decode["preferred_username"]) claimsMap = {"userName": self.user_id, "gatewayID": gateway_id} return AuthzToken(accessToken=self.access_token, claimsMap=claimsMap) def get_experiment(self, experiment_id: str): """ Get experiment by id """ return self.api_server_client.get_experiment(self.airavata_token, experiment_id) def get_process_id(self, experiment_id: str) -> str: """ Get process id by experiment id """ tree: any = self.api_server_client.get_detailed_experiment_tree(self.airavata_token, experiment_id) # type: ignore processModels: list = tree.processes assert len(processModels) == 1, f"Expected 1 process model, got {len(processModels)}" return processModels[0].processId def get_accessible_apps(self, gateway_id: str | None = None): """ Get all applications available in the gateway """ # use defaults for missing values gateway_id = gateway_id or self.default_gateway_id() # logic app_interfaces = self.api_server_client.get_all_application_interfaces(self.airavata_token, gateway_id) return app_interfaces def get_preferred_storage(self, gateway_id: str | None = None, sr_hostname: str | None = None): """ Get preferred storage resource """ # use defaults for missing values gateway_id = gateway_id or self.default_gateway_id() sr_hostname = sr_hostname or self.default_sr_hostname() # logic sr_names: dict[str, str] = self.api_server_client.get_all_storage_resource_names(self.airavata_token) # type: ignore sr_id = next((str(k) for k, v in sr_names.items() if v == sr_hostname)) return self.api_server_client.get_gateway_storage_preference(self.airavata_token, gateway_id, sr_id) def get_storage(self, storage_name: str | None = None) -> any: # type: ignore """ Get storage resource by name """ # use defaults for missing values storage_name = storage_name or self.default_sr_hostname() # logic sr_names: dict[str, str] = self.api_server_client.get_all_storage_resource_names(self.airavata_token) # type: ignore sr_id = next((str(k) for k, v in sr_names.items() if v == storage_name)) storage = self.api_server_client.get_storage_resource(self.airavata_token, sr_id) return storage def get_group_resource_profile_id(self, group: str) -> str: """ Get group resource profile id by name """ # logic grps: list = self.api_server_client.get_group_resource_list(self.airavata_token, self.default_gateway_id()) # type: ignore grp_id = next((grp.groupResourceProfileId for grp in grps if grp.groupResourceProfileName == group)) return str(grp_id) def get_group_resource_profile(self, group_id: str): grp: any = self.api_server_client.get_group_resource_profile(self.airavata_token, group_id) # type: ignore return grp def get_compatible_deployments(self, app_interface_id: str, group: str): """ Get compatible deployments for an application interface and group resource profile """ # logic grps: list = self.api_server_client.get_group_resource_list(self.airavata_token, self.default_gateway_id()) # type: ignore grp_id = next((grp.groupResourceProfileId for grp in grps if grp.groupResourceProfileName == group)) deployments = self.api_server_client.get_application_deployments_for_app_module_and_group_resource_profile(self.airavata_token, app_interface_id, grp_id) return deployments def get_app_interface_id(self, app_name: str, gateway_id: str | None = None): """ Get application interface id by name """ gateway_id = str(gateway_id or self.default_gateway_id()) apps: list = self.api_server_client.get_all_application_interfaces(self.airavata_token, gateway_id) # type: ignore app_id = next((app.applicationInterfaceId for app in apps if app.applicationName == app_name)) return str(app_id) def get_project_id(self, project_name: str, gateway_id: str | None = None): gateway_id = str(gateway_id or self.default_gateway_id()) projects: list = self.api_server_client.get_user_projects(self.airavata_token, gateway_id, self.user_id, 10, 0) # type: ignore project_id = next((p.projectID for p in projects if p.name == project_name and p.owner == self.user_id)) return str(project_id) def get_application_inputs(self, app_interface_id: str) -> list: """ Get application inputs by id """ return list(self.api_server_client.get_application_inputs(self.airavata_token, app_interface_id)) # type: ignore def get_compute_resources_by_ids(self, resource_ids: list[str]): """ Get compute resources by ids """ return [self.api_server_client.get_compute_resource(self.airavata_token, resource_id) for resource_id in resource_ids] def make_experiment_dir(self, sr_host: str, project_name: str, experiment_name: str) -> str: """ Make experiment directory on storage resource, and return the remote path Return Path: /{project_name}/{experiment_name} """ host = sr_host port = self.default_sftp_port() sftp_connector = SFTPConnector(host=host, port=int(port), username=self.user_id, password=self.access_token) remote_path = sftp_connector.mkdir(project_name, experiment_name) logger.info("Experiment directory created at %s", remote_path) return remote_path def upload_files(self, process_id: str | None, agent_ref: str | None, sr_host: str, local_files: list[Path], remote_dir: str) -> list[str]: """ Upload local files to a remote directory of a storage resource TODO add data_svc fallback Return Path: /{project_name}/{experiment_name} """ # step = experiment staging if process_id is None and agent_ref is None: host = sr_host port = self.default_sftp_port() sftp_connector = SFTPConnector(host=host, port=int(port), username=self.user_id, password=self.access_token) paths = sftp_connector.put(local_files, remote_dir) logger.info(f"{len(paths)} Local files uploaded to remote dir: %s", remote_dir) return paths # step = post-staging file upload elif process_id is not None and agent_ref is not None: assert len(local_files) == 1, f"Expected 1 file, got {len(local_files)}" file = local_files[0] fp = os.path.join("/data", file.name) rawdata = file.read_bytes() b64data = base64.b64encode(rawdata).decode() res = requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={ "agentId": agent_ref, "workingDir": ".", "arguments": ["sh", "-c", f"echo {b64data} | base64 -d > {fp}"] }) data = res.json() if data["error"] is not None: if str(data["error"]) == "Agent not found": port = self.default_sftp_port() sftp_connector = SFTPConnector(host=sr_host, port=int(port), username=self.user_id, password=self.access_token) paths = sftp_connector.put(local_files, remote_dir) return paths else: raise Exception(data["error"]) else: exc_id = data["executionId"] while True: res = requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}") data = res.json() if data["available"]: return [fp] time.sleep(1) # step = unknown else: raise ValueError("Invalid arguments for upload_files") # file manager service fallback assert process_id is not None, f"Expected process_id, got {process_id}" file = local_files[0] url_path = os.path.join(process_id, file.name) filemgr_svc_upload_url = f"{self.filemgr_svc_url()}/upload/live/{url_path}" def list_files(self, process_id: str, agent_ref: str, sr_host: str, remote_dir: str) -> list[str]: """ List files in a remote directory of a storage resource TODO add data_svc fallback Return Path: /{project_name}/{experiment_name} """ res = requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={ "agentId": agent_ref, "workingDir": ".", "arguments": ["sh", "-c", r"find /data -type d -name 'venv' -prune -o -type f -printf '%P\n' | sort"] }) data = res.json() if data["error"] is not None: if str(data["error"]) == "Agent not found": port = self.default_sftp_port() sftp_connector = SFTPConnector(host=sr_host, port=int(port), username=self.user_id, password=self.access_token) return sftp_connector.ls(remote_dir) else: raise Exception(data["error"]) else: exc_id = data["executionId"] while True: res = requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}") data = res.json() if data["available"]: files = data["responseString"].split("\n") return files time.sleep(1) # file manager service fallback assert process_id is not None, f"Expected process_id, got {process_id}" filemgr_svc_ls_url = f"{self.filemgr_svc_url()}/list/live/{process_id}" def download_file(self, process_id: str, agent_ref: str, sr_host: str, remote_file: str, remote_dir: str, local_dir: str) -> str: """ Download files from a remote directory of a storage resource to a local directory TODO add data_svc fallback Return Path: /{project_name}/{experiment_name} """ import os fp = os.path.join("/data", remote_file) res = requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={ "agentId": agent_ref, "workingDir": ".", "arguments": ["sh", "-c", f"cat {fp} | base64 -w0"] }) data = res.json() if data["error"] is not None: if str(data["error"]) == "Agent not found": port = self.default_sftp_port() fp = os.path.join(remote_dir, remote_file) sftp_connector = SFTPConnector(host=sr_host, port=int(port), username=self.user_id, password=self.access_token) path = sftp_connector.get(fp, local_dir) return path else: raise Exception(data["error"]) else: exc_id = data["executionId"] while True: res = requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}") data = res.json() if data["available"]: content = data["responseString"] import base64 content = base64.b64decode(content) path = Path(local_dir) / remote_file with open(path, "wb") as f: f.write(content) return path.as_posix() time.sleep(1) # file manager service fallback assert process_id is not None, f"Expected process_id, got {process_id}" url_path = os.path.join(process_id, remote_file) filemgr_svc_download_url = f"{self.filemgr_svc_url()}/download/live/{url_path}" def cat_file(self, process_id: str, agent_ref: str, sr_host: str, remote_file: str, remote_dir: str) -> bytes: """ Download files from a remote directory of a storage resource to a local directory TODO add data_svc fallback Return Path: /{project_name}/{experiment_name} """ import os fp = os.path.join("/data", remote_file) res = requests.post(f"{self.connection_svc_url()}/agent/executecommandrequest", json={ "agentId": agent_ref, "workingDir": ".", "arguments": ["sh", "-c", f"cat {fp} | base64 -w0"] }) data = res.json() if data["error"] is not None: if str(data["error"]) == "Agent not found": port = self.default_sftp_port() fp = os.path.join(remote_dir, remote_file) sftp_connector = SFTPConnector(host=sr_host, port=int(port), username=self.user_id, password=self.access_token) data = sftp_connector.cat(fp) return data else: raise Exception(data["error"]) else: exc_id = data["executionId"] while True: res = requests.get(f"{self.connection_svc_url()}/agent/executecommandresponse/{exc_id}") data = res.json() if data["available"]: content = data["responseString"] import base64 content = base64.b64decode(content) return content time.sleep(1) # file manager service fallback assert process_id is not None, f"Expected process_id, got {process_id}" url_path = os.path.join(process_id, remote_file) filemgr_svc_download_url = f"{self.filemgr_svc_url()}/download/live/{url_path}" def launch_experiment( self, experiment_name: str, project: str, app_name: str, inputs: dict[str, dict[str, str | int | float | list[str]]], computation_resource_name: str, queue_name: str, node_count: int, cpu_count: int, walltime: int, group: str = "Default", *, gateway_id: str | None = None, sr_host: str | None = None, auto_schedule: bool = False, ) -> LaunchState: """ Launch an experiment and return its id """ # preprocess args (str) print("[AV] Preprocessing args...") gateway_id = str(gateway_id or self.default_gateway_id()) sr_host = str(sr_host or self.default_sr_hostname()) mount_point = Path(self.default_gateway_data_store_dir()) / self.user_id server_url = urlparse(self.connection_svc_url()).netloc # validate args (str) print("[AV] Validating args...") assert len(experiment_name) > 0, f"Invalid experiment_name: {experiment_name}" assert len(app_name) > 0, f"Invalid app_name: {app_name}" assert len(computation_resource_name) > 0, f"Invalid computation_resource_name: {computation_resource_name}" assert len(inputs) > 0, f"Invalid inputs: {inputs}" assert len(gateway_id) > 0, f"Invalid gateway_id: {gateway_id}" assert len(queue_name) > 0, f"Invalid queue_name: {queue_name}" assert len(group) > 0, f"Invalid group name: {group}" assert len(sr_host) > 0, f"Invalid sr_host: {sr_host}" assert len(project) > 0, f"Invalid project_name: {project}" assert len(mount_point.as_posix()) > 0, f"Invalid mount_point: {mount_point}" # validate args (int) assert node_count > 0, f"Invalid node_count: {node_count}" assert cpu_count > 0, f"Invalid cpu_count: {cpu_count}" assert walltime > 0, f"Invalid walltime: {walltime}" # parse and validate inputs file_inputs = dict[str, Path | list[Path]]() data_inputs = dict[str, str | int | float]() for input_name, input_spec in inputs.items(): input_type = input_spec["type"] input_value = input_spec["value"] if input_type == "uri": assert isinstance(input_value, str) and os.path.isfile(str(input_value)), f"Invalid {input_name}: {input_value}" file_inputs[input_name] = Path(input_value) elif input_type == "uri[]": assert isinstance(input_value, list) and all([os.path.isfile(str(v)) for v in input_value]), f"Invalid {input_name}: {input_value}" file_inputs[input_name] = [Path(v) for v in input_value] else: assert isinstance(input_value, (int, float, str)), f"Invalid {input_name}: {input_value}" data_inputs[input_name] = input_value data_inputs.update({"agent_id": data_inputs.get("agent_id", str(uuid.uuid4()))}) data_inputs.update({"server_url": server_url}) # setup runtime params print("[AV] Setting up runtime params...") storage = self.get_storage(sr_host) sr_id = storage.storageResourceId # setup application interface print("[AV] Setting up application interface...") app_interface_id = self.get_app_interface_id(app_name) assert app_interface_id is not None, f"Invalid app_interface_id: {app_interface_id}" # setup experiment print("[AV] Setting up experiment...") experiment = self.create_experiment_model( experiment_name=experiment_name, application_name=app_name, project_name=project, description=experiment_name, gateway_id=gateway_id, ) # setup experiment directory print("[AV] Setting up experiment directory...") exp_dir = self.make_experiment_dir( sr_host=storage.hostName, project_name=project, experiment_name=experiment_name, ) abs_path = (mount_point / exp_dir.lstrip("/")).as_posix().rstrip("/") + "/" print("[AV] exp_dir:", exp_dir) print("[AV] abs_path:", abs_path) experiment = self.configure_computation_resource_scheduling( experiment_model=experiment, computation_resource_name=computation_resource_name, group=group, storageId=sr_id, node_count=node_count, total_cpu_count=cpu_count, wall_time_limit=walltime, queue_name=queue_name, experiment_dir_path=abs_path, auto_schedule=auto_schedule, ) def register_input_file(file: Path) -> str: return str(self.register_input_file(file.name, sr_host, sr_id, gateway_id, file.name, abs_path)) # set up file inputs print("[AV] Setting up file inputs...") files_to_upload = list[Path]() file_refs = dict[str, str | list[str]]() for key, value in file_inputs.items(): if isinstance(value, Path): files_to_upload.append(value) file_refs[key] = register_input_file(value) elif isinstance(value, list): assert all([isinstance(v, Path) for v in value]), f"Invalid file input value: {value}" files_to_upload.extend(value) file_refs[key] = [*map(register_input_file, value)] else: raise ValueError("Invalid file input type") # configure experiment inputs experiment_inputs = [] for exp_input in self.api_server_client.get_application_inputs(self.airavata_token, app_interface_id): # type: ignore assert exp_input.type is not None if exp_input.type < 3 and exp_input.name in data_inputs: value = data_inputs[exp_input.name] if exp_input.type == 0: exp_input.value = str(value) else: exp_input.value = repr(value) elif exp_input.type == 3 and exp_input.name in file_refs: ref = file_refs[exp_input.name] assert isinstance(ref, str) exp_input.value = ref elif exp_input.type == 4 and exp_input.name in file_refs: exp_input.value = ','.join(file_refs[exp_input.name]) experiment_inputs.append(exp_input) experiment.experimentInputs = experiment_inputs # configure experiment outputs outputs = self.api_server_client.get_application_outputs(self.airavata_token, app_interface_id) experiment.experimentOutputs = outputs # upload file inputs for experiment print(f"[AV] Uploading {len(files_to_upload)} file inputs for experiment...") self.upload_files(None, None, storage.hostName, files_to_upload, exp_dir) # create experiment ex_id = self.api_server_client.create_experiment(self.airavata_token, gateway_id, experiment) ex_id = str(ex_id) print(f"[AV] Experiment {experiment_name} CREATED with id: {ex_id}") # launch experiment self.api_server_client.launch_experiment(self.airavata_token, ex_id, gateway_id) print(f"[AV] Experiment {experiment_name} STARTED with id: {ex_id}") # wait until experiment begins, then get process id print(f"[AV] Experiment {experiment_name} WAITING until experiment begins...") process_id = None while process_id is None: try: process_id = self.get_process_id(ex_id) except: time.sleep(2) else: time.sleep(2) print(f"[AV] Experiment {experiment_name} EXECUTING with pid: {process_id}") # wait until task begins, then get job id print(f"[AV] Experiment {experiment_name} WAITING until task begins...") job_id = job_state = None while job_state is None: try: job_id, job_state = self.get_task_status(ex_id) except: time.sleep(2) else: time.sleep(2) print(f"[AV] Experiment {experiment_name} - Task {job_state} with id: {job_id}") return LaunchState( experiment_id=ex_id, agent_ref=str(data_inputs["agent_id"]), process_id=process_id, mount_point=mount_point, experiment_dir=exp_dir, sr_host=storage.hostName, ) def get_experiment_status(self, experiment_id: str) -> Literal["CREATED", "VALIDATED", "SCHEDULED", "LAUNCHED", "EXECUTING", "CANCELING", "CANCELED", "COMPLETED", "FAILED"]: states = ["CREATED", "VALIDATED", "SCHEDULED", "LAUNCHED", "EXECUTING", "CANCELING", "CANCELED", "COMPLETED", "FAILED"] status: any = self.api_server_client.get_experiment_status(self.airavata_token, experiment_id) # type: ignore return states[status.state] def stop_experiment(self, experiment_id: str): status = self.api_server_client.terminate_experiment( self.airavata_token, experiment_id, self.default_gateway_id()) return status def execute_py(self, project: str, libraries: list[str], code: str, agent_id: str, pid: str, runtime_args: dict, cold_start: bool = True) -> str | None: # lambda to send request print(f"[av] Attempting to submit to agent {agent_id}...") make_request = lambda: requests.post(f"{self.connection_svc_url()}/agent/executepythonrequest", json={ "libraries": libraries, "code": code, "pythonVersion": "3.10", # TODO verify "keepAlive": False, # TODO verify "parentExperimentId": "/data", # the working directory "agentId": agent_id, }) try: if cold_start: res = make_request() data = res.json() if data["error"] == "Agent not found": # waiting for agent to be available print(f"[av] Agent {agent_id} not found! Relaunching...") self.launch_experiment( experiment_name="Agent", app_name="AiravataAgent", project=project, inputs={ "agent_id": {"type": "str", "value": agent_id}, "server_url": {"type": "str", "value": urlparse(self.connection_svc_url()).netloc}, "process_id": {"type": "str", "value": pid}, }, computation_resource_name=runtime_args["cluster"], queue_name=runtime_args["queue_name"], node_count=1, cpu_count=runtime_args["cpu_count"], walltime=runtime_args["walltime"], group=runtime_args["group"], ) return self.execute_py(project, libraries, code, agent_id, pid, runtime_args, cold_start=False) elif data["executionId"] is not None: print(f"[av] Submitted to Python Interpreter") # agent response exc_id = data["executionId"] else: # unrecoverable error raise Exception(data["error"]) else: # poll until agent is available while True: res = make_request() data = res.json() if data["error"] == "Agent not found": # print(f"[av] Waiting for Agent {agent_id}...") time.sleep(2) continue elif data["executionId"] is not None: print(f"[av] Submitted to Python Interpreter") exc_id = data["executionId"] break else: raise Exception(data["error"]) assert exc_id is not None, f"Invalid execution id: {exc_id}" # wait for the execution response to be available while True: res = requests.get(f"{self.connection_svc_url()}/agent/executepythonresponse/{exc_id}") data = res.json() if data["available"]: response = str(data["responseString"]) return response time.sleep(1) except Exception as e: print(f"[av] Remote execution failed! {e}") return None def get_available_runtimes(self): from .runtime import Remote return [ Remote(cluster="login.expanse.sdsc.edu", category="gpu", queue_name="gpu-shared", node_count=1, cpu_count=10, gpu_count=1, walltime=30, group="Default"), Remote(cluster="login.expanse.sdsc.edu", category="cpu", queue_name="shared", node_count=1, cpu_count=10, gpu_count=0, walltime=30, group="Default"), Remote(cluster="anvil.rcac.purdue.edu", category="cpu", queue_name="shared", node_count=1, cpu_count=24, gpu_count=0, walltime=30, group="Default"), Remote(cluster="login.expanse.sdsc.edu", category="gpu", queue_name="gpu-shared", node_count=1, cpu_count=10, gpu_count=1, walltime=30, group="GaussianGroup"), Remote(cluster="login.expanse.sdsc.edu", category="cpu", queue_name="shared", node_count=1, cpu_count=10, gpu_count=0, walltime=30, group="GaussianGroup"), Remote(cluster="anvil.rcac.purdue.edu", category="cpu", queue_name="shared", node_count=1, cpu_count=24, gpu_count=0, walltime=30, group="GaussianGroup"), ] def get_task_status(self, experiment_id: str) -> tuple[str, Literal["SUBMITTED", "UN_SUBMITTED", "SETUP", "QUEUED", "ACTIVE", "COMPLETE", "CANCELING", "CANCELED", "FAILED", "HELD", "SUSPENDED", "UNKNOWN"] | None]: states = ["SUBMITTED", "UN_SUBMITTED", "SETUP", "QUEUED", "ACTIVE", "COMPLETE", "CANCELING", "CANCELED", "FAILED", "HELD", "SUSPENDED", "UNKNOWN"] job_details: dict = self.api_server_client.get_job_statuses(self.airavata_token, experiment_id) # type: ignore job_id = job_state = None # get the most recent job id and state for job_id, v in job_details.items(): if v.reason in states: job_state = v.reason else: job_state = states[int(v.jobState)] return job_id or "N/A", job_state # type: ignore