azext_iot/central/providers/scheduled_job_provider.py (127 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import List
from knack.log import get_logger
from azext_iot.constants import CENTRAL_ENDPOINT
from azext_iot.central import services as central_services
logger = get_logger(__name__)
class CentralScheduledJobProvider:
def __init__(self, cmd, app_id: str, api_version: str, token=None):
"""
Provider for schedule job APIs
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
api_version: API version (appendend to request URL)
token: (OPTIONAL) authorization token to fetch API token details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
Useful in scenarios where user doesn't own the app
therefore AAD token won't work, but a SAS token generated by owner will
"""
self._cmd = cmd
self._app_id = app_id
self._token = token
self._api_version = api_version
def get_scheduled_job(
self,
job_id: str,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
return central_services.scheduled_job.get_scheduled_job(
cmd=self._cmd,
app_id=self._app_id,
token=self._token,
api_version=self._api_version,
job_id=job_id,
central_dns_suffix=central_dns_suffix,
)
def list_scheduled_jobs(
self,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> List[dict]:
return central_services.scheduled_job.list_scheduled_jobs(
cmd=self._cmd,
app_id=self._app_id,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
def create_scheduled_job(
self,
job_id,
group_id,
schedule,
content,
job_name=None,
description=None,
batch_percentage=None,
threshold_percentage=None,
threshold_batch=None,
batch=None,
threshold=None,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.scheduled_job.create_scheduled_job(
cmd=self._cmd,
app_id=self._app_id,
token=self._token,
api_version=self._api_version,
job_id=job_id,
job_name=job_name,
group_id=group_id,
content=content,
schedule=schedule,
description=description,
batch_percentage=batch_percentage,
threshold_percentage=threshold_percentage,
threshold_batch=threshold_batch,
batch=batch,
threshold=threshold,
central_dns_suffix=central_dns_suffix,
)
def update_scheduled_job(
self,
job_id,
group_id=None,
schedule=None,
content=None,
job_name=None,
description=None,
batch_percentage=None,
threshold_percentage=None,
threshold_batch=None,
batch=None,
threshold=None,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.scheduled_job.update_scheduled_job(
cmd=self._cmd,
app_id=self._app_id,
token=self._token,
api_version=self._api_version,
job_id=job_id,
job_name=job_name,
group_id=group_id,
content=content,
schedule=schedule,
description=description,
batch_percentage=batch_percentage,
threshold_percentage=threshold_percentage,
threshold_batch=threshold_batch,
batch=batch,
threshold=threshold,
central_dns_suffix=central_dns_suffix,
)
def delete_scheduled_job(
self,
job_id: str,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
return central_services.scheduled_job.delete_scheduled_job(
cmd=self._cmd,
app_id=self._app_id,
token=self._token,
api_version=self._api_version,
job_id=job_id,
central_dns_suffix=central_dns_suffix,
)
def list_runs(
self,
job_id: str,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
return central_services.scheduled_job.list_runs(
cmd=self._cmd,
app_id=self._app_id,
token=self._token,
api_version=self._api_version,
job_id=job_id,
central_dns_suffix=central_dns_suffix,
)