in xlml/apis/task.py [0:0]
def run_with_existing_instance(self) -> DAGNode:
"""Run a test job via existing instance.
Returns:
A task group with the following tasks chained: provision, run_model and post_process, clean_up.
"""
with TaskGroup(
group_id=self.task_test_config.benchmark_id, prefix_group_id=True
) as group:
(
provision,
ip_address,
ssh_keys,
gcs_location,
) = self.provision_via_existing_instance()
if (
self.task_metric_config
and self.task_metric_config.use_runtime_generated_gcs_folder
):
env_variable = {
f"{metric_config.SshEnvVars.GCS_OUTPUT.name}": gcs_location
}
else:
env_variable = None
post_process = self.post_process(gcs_location)
run_model = self.run_model(ip_address, ssh_keys, env_variable)
clean_up = self.clean_up_existing_instance(ssh_keys)
provision >> run_model >> post_process >> clean_up
return group