def cancel_training_job()

in src/hyperpod_cli/service/cancel_training_job.py [0:0]


    def cancel_training_job(self, job_name: str, namespace: Optional[str]):
        """
        Cancel training job provided by the user in the specified namespace.
        If namespace is not provided job is canceled from the default namespace in user context
        """

        k8s_client = KubernetesClient()

        if not namespace:
            resource_attributes_template = V1ResourceAttributes(
                verb="delete",
                group=PYTORCH_CUSTOM_OBJECT_GROUP,
                resource=PYTORCH_CUSTOM_OBJECT_PLURAL,
            )
            namespace = DiscoverNamespaces().discover_accessible_namespace(
                resource_attributes_template
            )
        try:
            result = k8s_client.delete_training_job(
                job_name=job_name, namespace=namespace
            )
        except ApiException as e:
            raise RuntimeError(f"Unexpected API error: {e.reason} ({e.status})")

        helm_chart_cleanup_command = [
            "helm",
            "uninstall",
            job_name,
            "--namespace",
            namespace,
        ]

        if result.get("status") and result.get("status") == "Success":
            subprocess.run(helm_chart_cleanup_command, capture_output=True, text=True)
            return None
        else:
            return result