azext_iot/digitaltwins/providers/deletion_job.py (34 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from typing import Optional from azext_iot.common.utility import handle_service_exception from azext_iot.digitaltwins.providers.base import DigitalTwinsProvider from azext_iot.digitaltwins.providers import ErrorResponseException from azext_iot.common.embedded_cli import EmbeddedCLI from knack.log import get_logger from uuid import uuid4 DEFAULT_DELETE_JOB_ID_PREFIX = "delete-job-" logger = get_logger(__name__) class DeletionJobProvider(DigitalTwinsProvider): def __init__(self, cmd, name: str, rg: str = None): super(DeletionJobProvider, self).__init__(cmd=cmd, name=name, rg=rg) self.sdk = self.get_sdk().delete_jobs self.cli = EmbeddedCLI(cli_ctx=cmd.cli_ctx) def get(self, job_id: str): try: return self.sdk.get_by_id(job_id) except ErrorResponseException as e: handle_service_exception(e) def list(self, top: int = None): # top is guarded for int() in arg def from azext_iot.sdk.digitaltwins.dataplane.models import DeleteJobsListOptions list_options = DeleteJobsListOptions(max_items_per_page=top) try: return self.sdk.list(import_jobs_list_options=list_options,) except ErrorResponseException as e: handle_service_exception(e) def create(self, job_id: Optional[str] = None, timeout_in_min: Optional[int] = None): job_id = job_id if job_id else DEFAULT_DELETE_JOB_ID_PREFIX + str(uuid4()).replace("-", "") self.sdk.config.operation_id = job_id self.sdk.config.timeout_in_minutes = timeout_in_min try: return self.sdk.add(polling=False) except ErrorResponseException as e: handle_service_exception(e)