azext_iot/central/services/scheduled_job.py (186 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import List
import requests
from knack.log import get_logger
from azure.cli.core.azclierror import AzureResponseError
from azext_iot.constants import CENTRAL_ENDPOINT
from azext_iot.central.services import _utility
from azext_iot.central.models.ga_2022_07_31 import ScheduledJobGa
from azext_iot.central.common import API_VERSION
logger = get_logger(__name__)
BASE_PATH = "api/scheduledJobs"
MODEL = "ScheduledJob"
def list_scheduled_jobs(
cmd,
app_id: str,
token: str,
api_version=API_VERSION,
max_pages=0,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> List[ScheduledJobGa]:
"""
Get a list of all scheduled jobs.
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
list of scheduled jobs
"""
api_version = API_VERSION
scheduled_jobs = []
url = "https://{}.{}/{}".format(app_id, central_dns_suffix, BASE_PATH)
headers = _utility.get_headers(token, cmd)
# Construct parameters
query_parameters = {}
query_parameters["api-version"] = api_version
pages_processed = 0
while (max_pages == 0 or pages_processed < max_pages) and url:
response = requests.get(url, headers=headers, params=query_parameters)
result = _utility.try_extract_result(response)
if "value" not in result:
raise AzureResponseError("Value is not present in body: {}".format(result))
for scheduled_job in result["value"]:
scheduled_jobs.append(ScheduledJobGa(scheduled_job))
url = result.get("nextLink", None)
pages_processed = pages_processed + 1
return scheduled_jobs
def get_scheduled_job(
cmd,
app_id: str,
job_id: str,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> ScheduledJobGa:
"""
Get a specific scheduled job.
Args:
cmd: command passed into az
job_id: case sensitive scheduled job id,
app_id: name of app (used for forming request URL)
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
scheduled_job: dict
"""
api_version = API_VERSION
result = _utility.make_api_call(
cmd,
app_id=app_id,
method="GET",
url="https://{}.{}/{}/{}".format(app_id, central_dns_suffix, BASE_PATH, job_id),
payload=None,
token=token,
api_version=api_version,
central_dnx_suffix=central_dns_suffix,
)
return _utility.get_object(result, model=MODEL, api_version=api_version)
def create_scheduled_job(
cmd,
app_id: str,
job_id: str,
group_id: str,
content: list,
schedule: str,
job_name: str,
description: str,
batch_percentage: bool,
threshold_percentage: bool,
threshold_batch: bool,
batch: int,
threshold: int,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> ScheduledJobGa:
"""
Creates a scheduled job.
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
job_id: unique case-sensitive job id
group_id: The ID of the device group on which to execute the job.
content: Data related to the operation being performed by this job.
schedule: The schedule at which to execute the job.
job_name: (OPTIONAL)(non-unique) human readable name for the job
description: (OPTIONAL) Detailed description of the job.
token: (OPTIONAL) authorization token to fetch job details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
scheduled_job: dict
"""
api_version = API_VERSION
if not job_name:
job_name = job_id
url = "https://{}.{}/{}/{}".format(app_id, central_dns_suffix, BASE_PATH, job_id)
headers = _utility.get_headers(token, cmd, has_json_payload=True)
# Construct parameters
query_parameters = {}
query_parameters["api-version"] = api_version
payload = {"displayName": job_name, "group": group_id, "data": content, "schedule": schedule}
if description:
payload["description"] = description
if batch is not None:
payload["batch"] = {
"value": batch,
"type": "percentage" if batch_percentage else "number",
}
if threshold is not None:
payload["cancellationThreshold"] = {
"value": threshold,
"type": "percentage" if threshold_percentage else "number",
"batch": threshold_batch,
}
response = requests.put(url, headers=headers, json=payload, params=query_parameters)
result = _utility.try_extract_result(response)
return _utility.get_object(result, model=MODEL, api_version=api_version)
def update_scheduled_job(
cmd,
app_id: str,
job_id: str,
group_id: str,
content: list,
schedule: str,
job_name: str,
description: str,
batch_percentage: bool,
threshold_percentage: bool,
threshold_batch: bool,
batch: int,
threshold: int,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> ScheduledJobGa:
"""
Updates a scheduled job.
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
job_id: unique case-sensitive job id
group_id: The ID of the device group on which to execute the job.
content: Data related to the operation being performed by this job.
schedule: The schedule at which to execute the job.
job_name: (OPTIONAL)(non-unique) human readable name for the job.
description: (OPTIONAL) Detailed description of the job.
token: (OPTIONAL) authorization token to fetch job details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
scheduled_job: dict
"""
api_version = API_VERSION
if not job_name:
job_name = job_id
url = "https://{}.{}/{}/{}".format(app_id, central_dns_suffix, BASE_PATH, job_id)
headers = _utility.get_headers(token, cmd, has_json_payload=True)
# Construct parameters
query_parameters = {}
query_parameters["api-version"] = api_version
payload = {}
if job_name:
payload["displayName"] = job_name
if group_id:
payload["group"] = group_id
if content:
payload["data"] = content
if schedule:
payload["schedule"] = schedule
if description:
payload["description"] = description
if batch is not None:
payload["batch"] = {
"value": batch,
"type": "percentage" if batch_percentage else "number",
}
if threshold is not None:
payload["cancellationThreshold"] = {
"value": threshold,
"type": "percentage" if threshold_percentage else "number",
"batch": threshold_batch,
}
response = requests.patch(url, headers=headers, json=payload, params=query_parameters)
result = _utility.try_extract_result(response)
return _utility.get_object(result, model=MODEL, api_version=api_version)
def delete_scheduled_job(
cmd,
app_id: str,
job_id: str,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
"""
Delete a scheduled job.
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
job_id: case sensitive scheduled job id,
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
scheduled_job: dict
"""
api_version = API_VERSION
return _utility.make_api_call(
cmd,
app_id=app_id,
method="DELETE",
url="https://{}.{}/{}/{}".format(app_id, central_dns_suffix, BASE_PATH, job_id),
payload=None,
token=token,
api_version=api_version,
central_dnx_suffix=central_dns_suffix,
)
def list_runs(
cmd,
app_id: str,
job_id: str,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
"""
Get the list of job instances for a scheduled job definition.
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
job_id: case sensitive scheduled job id,
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
scheduled_job: dict
"""
api_version = API_VERSION
return _utility.make_api_call(
cmd,
app_id=app_id,
method="GET",
url="https://{}.{}/{}/{}/jobs".format(app_id, central_dns_suffix, BASE_PATH, job_id),
payload=None,
token=token,
api_version=api_version,
central_dnx_suffix=central_dns_suffix,
)