azext_iot/central/services/job.py (237 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import requests
from typing import List, Union
from knack.log import get_logger
from azure.cli.core.azclierror import AzureResponseError
from azext_iot.constants import CENTRAL_ENDPOINT
from azext_iot.central.common import API_VERSION
from azext_iot.central.services import _utility
from azure.cli.core.util import should_disable_connection_verify
from azext_iot.central.models.ga_2022_07_31 import JobGa
logger = get_logger(__name__)
BASE_PATH = "api/jobs"
def _call_job(
cmd,
method: str,
path: str,
app_id: str,
job_id: str,
body: str,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> Union[dict, JobGa]:
api_version = API_VERSION
url = "https://{}.{}/{}/{}".format(app_id, central_dns_suffix, BASE_PATH, job_id)
headers = _utility.get_headers(token, cmd)
if path is not None:
url = "{}/{}".format(url, path)
# Construct parameters
query_parameters = {}
query_parameters["api-version"] = api_version
if method is None:
method = "get"
response = requests.request(
method=method.upper(),
url=url,
headers=headers,
params=query_parameters,
verify=not should_disable_connection_verify(),
json=body,
)
result = _utility.try_extract_result(response)
return result
def _list_job(
cmd,
app_id: str,
path: str,
token: str,
api_version=API_VERSION,
max_pages=0,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> List[JobGa]:
"""
Get a list of all jobs in IoTC app
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
token: (OPTIONAL) authorization token to fetch job details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
list of jobs
"""
api_version = API_VERSION
values = []
url = "https://{}.{}/{}".format(app_id, central_dns_suffix, BASE_PATH)
headers = _utility.get_headers(token, cmd)
if path is not None:
url = "{}/{}".format(url, path)
# Construct parameters
query_parameters = {}
query_parameters["api-version"] = api_version
pages_processed = 0
while (max_pages == 0 or pages_processed < max_pages) and url:
response = requests.get(
url,
headers=headers,
params=query_parameters,
verify=not should_disable_connection_verify(),
)
result = _utility.try_extract_result(response)
if "value" not in result:
raise AzureResponseError("Value is not present in body: {}".format(result))
values.extend(result["value"])
url = result.get("nextLink", None)
pages_processed = pages_processed + 1
return values
def get_job(
cmd,
app_id: str,
job_id: str,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> JobGa:
"""
Get job info given a job id
Args:
cmd: command passed into az
job_id: unique case-sensitive job id,
app_id: name of app (used for forming request URL)
token: (OPTIONAL) authorization token to fetch job details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
job: dict
"""
api_version = API_VERSION
result = _call_job(
cmd=cmd,
method="get",
path=None,
app_id=app_id,
job_id=job_id,
body=None,
token=token,
central_dns_suffix=central_dns_suffix,
api_version=api_version,
)
return _utility.get_object(result, "Job", api_version)
def stop_job(
cmd,
app_id: str,
job_id: str,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> JobGa:
"""
Stop a running job
Args:
cmd: command passed into az
job_id: unique case-sensitive job id,
app_id: name of app (used for forming request URL)
token: (OPTIONAL) authorization token to fetch job details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
job: dict
"""
api_version = API_VERSION
result = _call_job(
cmd=cmd,
method="post",
path="stop",
app_id=app_id,
job_id=job_id,
body=None,
token=token,
central_dns_suffix=central_dns_suffix,
api_version=api_version,
)
return _utility.get_object(result, "Job", api_version)
def resume_job(
cmd,
app_id: str,
job_id: str,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> JobGa:
"""
Resume a stopped job
Args:
cmd: command passed into az
job_id: unique case-sensitive job id,
app_id: name of app (used for forming request URL)
token: (OPTIONAL) authorization token to fetch job details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
job: dict
"""
api_version = API_VERSION
result = _call_job(
cmd=cmd,
method="post",
path="resume",
app_id=app_id,
job_id=job_id,
body=None,
token=token,
central_dns_suffix=central_dns_suffix,
api_version=api_version,
)
return _utility.get_object(result, "Job", api_version)
def rerun_job(
cmd,
app_id: str,
job_id: str,
rerun_id: str,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> JobGa:
"""
Rerun a job on failed devices
Args:
cmd: command passed into az
job_id: unique case-sensitive job id,
rerun_id: unique case-sensitive rerun id,
app_id: name of app (used for forming request URL)
token: (OPTIONAL) authorization token to fetch job details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
job: dict
"""
api_version = API_VERSION
result = _call_job(
cmd=cmd,
method="put",
path="rerun/{}".format(rerun_id),
app_id=app_id,
job_id=job_id,
body=None,
token=token,
central_dns_suffix=central_dns_suffix,
api_version=api_version,
)
return _utility.get_object(result, "Job", api_version)
def get_job_devices(
cmd,
app_id: str,
job_id: str,
token: str,
api_version=API_VERSION,
max_pages=0,
central_dns_suffix=CENTRAL_ENDPOINT,
):
"""
Get device statuses
Args:
cmd: command passed into az
job_id: unique case-sensitive job id,
app_id: name of app (used for forming request URL)
token: (OPTIONAL) authorization token to fetch job details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
job: dict
"""
api_version = API_VERSION
return _list_job(
cmd=cmd,
app_id=app_id,
path="{}/{}".format(job_id, "devices"),
token=token,
max_pages=max_pages,
central_dns_suffix=central_dns_suffix,
api_version=api_version,
)
def list_jobs(
cmd,
app_id: str,
token: str,
api_version=API_VERSION,
max_pages=0,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> List[JobGa]:
api_version = API_VERSION
values = _list_job(
cmd=cmd,
app_id=app_id,
path=None,
token=token,
max_pages=max_pages,
central_dns_suffix=central_dns_suffix,
api_version=api_version,
)
return [_utility.get_object(job, "Job", api_version) for job in values]
def create_job(
cmd,
app_id: str,
job_id: str,
group_id: str,
content: list,
job_name: str,
description: str,
batch_percentage: bool,
threshold_percentage: bool,
threshold_batch: bool,
batch: int,
threshold: int,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> JobGa:
"""
Create a job in IoTC
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
job_id: unique case-sensitive job id
group_id: The ID of the device group on which to execute the job.
content: see example payload available in
<repo-root>/azext_iot/tests/central/json/job_int_test.json
job_name: (OPTIONAL)(non-unique) human readable name for the job
description: (OPTIONAL) Detailed description of the job.
token: (OPTIONAL) authorization token to fetch job details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
job: dict
"""
api_version = API_VERSION
if not job_name:
job_name = job_id
url = "https://{}.{}/{}/{}".format(app_id, central_dns_suffix, BASE_PATH, job_id)
headers = _utility.get_headers(token, cmd, has_json_payload=True)
# Construct parameters
query_parameters = {}
query_parameters["api-version"] = api_version
payload = {"displayName": job_name, "group": group_id, "data": content}
if description:
payload["description"] = description
if batch is not None:
payload["batch"] = {
"value": batch,
"type": "percentage" if batch_percentage else "number",
}
if threshold is not None:
payload["cancellationThreshold"] = {
"value": threshold,
"type": "percentage" if threshold_percentage else "number",
"batch": threshold_batch,
}
response = requests.put(url, headers=headers, json=payload, params=query_parameters)
result = _utility.try_extract_result(response)
return _utility.get_object(result, "Job", api_version)