azext_iot/central/providers/scheduled_job_provider.py (127 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from typing import List from knack.log import get_logger from azext_iot.constants import CENTRAL_ENDPOINT from azext_iot.central import services as central_services logger = get_logger(__name__) class CentralScheduledJobProvider: def __init__(self, cmd, app_id: str, api_version: str, token=None): """ Provider for schedule job APIs Args: cmd: command passed into az app_id: name of app (used for forming request URL) api_version: API version (appendend to request URL) token: (OPTIONAL) authorization token to fetch API token details from IoTC. MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...') Useful in scenarios where user doesn't own the app therefore AAD token won't work, but a SAS token generated by owner will """ self._cmd = cmd self._app_id = app_id self._token = token self._api_version = api_version def get_scheduled_job( self, job_id: str, central_dns_suffix=CENTRAL_ENDPOINT, ) -> dict: return central_services.scheduled_job.get_scheduled_job( cmd=self._cmd, app_id=self._app_id, token=self._token, api_version=self._api_version, job_id=job_id, central_dns_suffix=central_dns_suffix, ) def list_scheduled_jobs( self, central_dns_suffix=CENTRAL_ENDPOINT, ) -> List[dict]: return central_services.scheduled_job.list_scheduled_jobs( cmd=self._cmd, app_id=self._app_id, token=self._token, api_version=self._api_version, central_dns_suffix=central_dns_suffix, ) def create_scheduled_job( self, job_id, group_id, schedule, content, job_name=None, description=None, batch_percentage=None, threshold_percentage=None, threshold_batch=None, batch=None, threshold=None, central_dns_suffix=CENTRAL_ENDPOINT, ): return central_services.scheduled_job.create_scheduled_job( cmd=self._cmd, app_id=self._app_id, token=self._token, api_version=self._api_version, job_id=job_id, job_name=job_name, group_id=group_id, content=content, schedule=schedule, description=description, batch_percentage=batch_percentage, threshold_percentage=threshold_percentage, threshold_batch=threshold_batch, batch=batch, threshold=threshold, central_dns_suffix=central_dns_suffix, ) def update_scheduled_job( self, job_id, group_id=None, schedule=None, content=None, job_name=None, description=None, batch_percentage=None, threshold_percentage=None, threshold_batch=None, batch=None, threshold=None, central_dns_suffix=CENTRAL_ENDPOINT, ): return central_services.scheduled_job.update_scheduled_job( cmd=self._cmd, app_id=self._app_id, token=self._token, api_version=self._api_version, job_id=job_id, job_name=job_name, group_id=group_id, content=content, schedule=schedule, description=description, batch_percentage=batch_percentage, threshold_percentage=threshold_percentage, threshold_batch=threshold_batch, batch=batch, threshold=threshold, central_dns_suffix=central_dns_suffix, ) def delete_scheduled_job( self, job_id: str, central_dns_suffix=CENTRAL_ENDPOINT, ) -> dict: return central_services.scheduled_job.delete_scheduled_job( cmd=self._cmd, app_id=self._app_id, token=self._token, api_version=self._api_version, job_id=job_id, central_dns_suffix=central_dns_suffix, ) def list_runs( self, job_id: str, central_dns_suffix=CENTRAL_ENDPOINT, ) -> dict: return central_services.scheduled_job.list_runs( cmd=self._cmd, app_id=self._app_id, token=self._token, api_version=self._api_version, job_id=job_id, central_dns_suffix=central_dns_suffix, )