in xlml/utils/gpu.py [0:0]
def delete_resource(instance_name: airflow.XComArg, project_id: str, zone: str):
@task(trigger_rule="all_done")
def delete_resource_request(
instance_name: str, project_id: str, zone: str
) -> airflow.XComArg:
client = compute_v1.InstancesClient()
request = compute_v1.DeleteInstanceRequest(
instance=instance_name,
project=project_id,
zone=zone,
)
operation = client.delete(request=request)
return operation.name
@task.sensor(poke_interval=60, timeout=1800, mode="reschedule")
def wait_for_resource_deletion(operation_name: airflow.XComArg):
# Retrives the delete opeartion to check the status.
client = compute_v1.ZoneOperationsClient()
request = compute_v1.GetZoneOperationRequest(
operation=operation_name,
project=project_id,
zone=zone,
)
operation = client.get(request=request)
status = operation.status.name
if status in ("RUNNING", "PENDING"):
logging.info(
f"Resource deletion status: {status}, {operation.status_message}"
)
return False
else:
if operation.error:
logging.error(
(
"Error during resource deletion: [Code:"
f" {operation.http_error_status_code}]:"
f" {operation.http_error_message}"
),
)
logging.error(f"Operation ID: {operation.name}")
raise operation.exception() or RuntimeError(
operation.http_error_message
)
elif operation.warnings:
logging.warning("Warnings during resource deletion:\n")
for warning in operation.warnings:
logging.warning(f" - {warning.code}: {warning.message}")
return True
op = delete_resource_request(instance_name, project_id, zone)
wait_for_resource_deletion(op)