def delete_resource()

in xlml/utils/gpu.py [0:0]


def delete_resource(instance_name: airflow.XComArg, project_id: str, zone: str):
  @task(trigger_rule="all_done")
  def delete_resource_request(
      instance_name: str, project_id: str, zone: str
  ) -> airflow.XComArg:
    client = compute_v1.InstancesClient()
    request = compute_v1.DeleteInstanceRequest(
        instance=instance_name,
        project=project_id,
        zone=zone,
    )
    operation = client.delete(request=request)

    return operation.name

  @task.sensor(poke_interval=60, timeout=1800, mode="reschedule")
  def wait_for_resource_deletion(operation_name: airflow.XComArg):
    # Retrives the delete opeartion to check the status.
    client = compute_v1.ZoneOperationsClient()
    request = compute_v1.GetZoneOperationRequest(
        operation=operation_name,
        project=project_id,
        zone=zone,
    )
    operation = client.get(request=request)
    status = operation.status.name
    if status in ("RUNNING", "PENDING"):
      logging.info(
          f"Resource deletion status: {status}, {operation.status_message}"
      )
      return False
    else:
      if operation.error:
        logging.error(
            (
                "Error during resource deletion: [Code:"
                f" {operation.http_error_status_code}]:"
                f" {operation.http_error_message}"
            ),
        )
        logging.error(f"Operation ID: {operation.name}")
        raise operation.exception() or RuntimeError(
            operation.http_error_message
        )
      elif operation.warnings:
        logging.warning("Warnings during resource deletion:\n")
        for warning in operation.warnings:
          logging.warning(f" - {warning.code}: {warning.message}")
      return True

  op = delete_resource_request(instance_name, project_id, zone)
  wait_for_resource_deletion(op)