azext_iot/iothub/providers/job.py (199 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from time import sleep
from datetime import datetime, timedelta
from knack.log import get_logger
from azure.cli.core.azclierror import (
CLIInternalError,
InvalidArgumentValueError,
RequiredArgumentMissingError,
)
from azext_iot.common.shared import SdkType, JobStatusType, JobType, JobVersionType
from azext_iot.common.utility import handle_service_exception, process_json_arg
from azext_iot.operations.generic import _execute_query
from azext_iot.iothub.providers.base import IoTHubProvider, CloudError, SerializationError
logger = get_logger(__name__)
class JobProvider(IoTHubProvider):
def get(self, job_id):
job_result = self._get(job_id)
if "status" in job_result and job_result["status"] == JobStatusType.unknown.value:
# Replace 'unknown' v2 result with v1 result
job_result = self._get(job_id, JobVersionType.v1)
return job_result
def _get(self, job_id, job_version=JobVersionType.v2):
service_sdk = self.get_sdk(SdkType.service_sdk)
try:
if job_version == JobVersionType.v2:
return service_sdk.jobs.get_scheduled_job(id=job_id, raw=True).response.json()
return self._convert_v1_to_v2(service_sdk.jobs.get_import_export_job(id=job_id))
except CloudError as e:
handle_service_exception(e)
def cancel(self, job_id):
job_result = self.get(job_id)
if "type" in job_result and job_result["type"] in [JobType.exportDevices.value, JobType.importDevices.value]:
# v1 Job
return self._cancel(job_id, JobVersionType.v1)
# v2 Job
return self._cancel(job_id)
def _cancel(self, job_id, job_version=JobVersionType.v2):
service_sdk = self.get_sdk(SdkType.service_sdk)
try:
if job_version == JobVersionType.v2:
return service_sdk.jobs.cancel_scheduled_job(id=job_id, raw=True).response.json()
return service_sdk.jobs.cancel_import_export_job(id=job_id)
except CloudError as e:
handle_service_exception(e)
def list(self, job_type=None, job_status=None, top=None):
jobs_collection = []
if (
job_type not in [JobType.exportDevices.value, JobType.importDevices.value]
or not job_type
):
jobs_collection.extend(
self._list(job_type=job_type, job_status=job_status, top=top)
)
if (
job_type in [JobType.exportDevices.value, JobType.importDevices.value]
or not job_type
):
if (top and len(jobs_collection) < top) or not top:
jobs_collection.extend(self._list(job_version=JobVersionType.v1))
# v1 API has no means of filtering service side :(
jobs_collection = self._filter_jobs(
jobs=jobs_collection, job_type=job_type, job_status=job_status
)
# Trim based on top, since there is no way to pass a 'top' into the v1 API :(
if top:
jobs_collection = jobs_collection[:top]
return jobs_collection
def _list(self, job_type=None, job_status=None, top=None, job_version=JobVersionType.v2):
service_sdk = self.get_sdk(SdkType.service_sdk)
jobs_collection = []
try:
if job_version == JobVersionType.v2:
query = [job_type, job_status]
query_method = service_sdk.jobs.query_scheduled_jobs
jobs_collection.extend(_execute_query(query, query_method, top))
elif job_version == JobVersionType.v1:
jobs_collection.extend(service_sdk.jobs.get_import_export_jobs())
jobs_collection = [self._convert_v1_to_v2(job) for job in jobs_collection]
return jobs_collection
except CloudError as e:
handle_service_exception(e)
def create(
self,
job_id,
job_type,
start_time=None,
query_condition=None,
twin_patch=None,
method_name=None,
method_payload=None,
method_connect_timeout=30,
method_response_timeout=30,
ttl=3600,
wait=False,
poll_interval=10,
poll_duration=600,
):
from azext_iot.sdk.iothub.service.models import (
CloudToDeviceMethod,
JobRequest
)
if (
job_type
in [JobType.scheduleUpdateTwin.value, JobType.scheduleDeviceMethod.value]
and not query_condition
):
raise RequiredArgumentMissingError(
"The query condition is required when job type is {} or {}. "
"Use query condition '*' if you need to run job on all devices.".format(
JobType.scheduleUpdateTwin.value, JobType.scheduleDeviceMethod.value
)
)
if poll_duration < 1:
raise InvalidArgumentValueError("--poll-duration must be greater than 0.")
if poll_interval < 1:
raise InvalidArgumentValueError("--poll-interval must be greater than 0.")
if job_type == JobType.scheduleUpdateTwin.value:
if not twin_patch:
raise RequiredArgumentMissingError(
"The {} job type requires --twin-patch.".format(
JobType.scheduleUpdateTwin.value
)
)
twin_patch = process_json_arg(twin_patch, argument_name="twin-patch")
if not isinstance(twin_patch, dict):
raise InvalidArgumentValueError(
"Twin patches must be objects. Received type: {}".format(
type(twin_patch)
)
)
elif job_type == JobType.scheduleDeviceMethod.value:
if not method_name:
raise RequiredArgumentMissingError(
"The {} job type requires --method-name.".format(
JobType.scheduleDeviceMethod.value
)
)
method_payload = process_json_arg(
method_payload, argument_name="method-payload"
)
job_request = JobRequest(
job_id=job_id,
type=job_type,
start_time=start_time,
max_execution_time_in_seconds=ttl,
query_condition=query_condition,
)
if job_type == JobType.scheduleUpdateTwin.value:
# scheduleUpdateTwin job type is a force update, which only accepts '*' as the Etag.
twin_patch["etag"] = "*"
job_request.update_twin = twin_patch
elif job_type == JobType.scheduleDeviceMethod.value:
job_request.cloud_to_device_method = CloudToDeviceMethod(
method_name=method_name,
connect_timeout_in_seconds=method_connect_timeout,
response_timeout_in_seconds=method_response_timeout,
payload=method_payload,
)
service_sdk = self.get_sdk(SdkType.service_sdk)
try:
job_result = service_sdk.jobs.create_scheduled_job(id=job_id, job_request=job_request, raw=True).response.json()
if wait:
logger.info("Waiting for job finished state...")
current_datetime = datetime.now()
end_datetime = current_datetime + timedelta(seconds=poll_duration)
while True:
job_result = self._get(job_id)
if "status" in job_result:
refreshed_job_status = job_result["status"]
logger.info("Refreshed job status: '%s'", refreshed_job_status)
if refreshed_job_status in [
JobStatusType.completed.value,
JobStatusType.failed.value,
JobStatusType.cancelled.value,
]:
break
if datetime.now() > end_datetime:
logger.info("Job not completed within poll duration....")
break
logger.info("Waiting %d seconds for next refresh...", poll_interval)
sleep(poll_interval)
return job_result
except CloudError as e:
handle_service_exception(e)
except SerializationError as se:
# ISO8601 parsing is handled by msrest
raise CLIInternalError(se)
def _convert_v1_to_v2(self, job_v1):
v2_result = {}
# For v1 jobs, startTime is the same as createdTime
v2_result["createdTime"] = job_v1.start_time_utc
v2_result["startTime"] = job_v1.start_time_utc
v2_result["endTime"] = job_v1.end_time_utc
v2_result["jobId"] = job_v1.job_id
v2_result["status"] = job_v1.status
v2_result["type"] = job_v1.type
v2_result["progress"] = job_v1.progress
v2_result["excludeKeysInExport"] = job_v1.exclude_keys_in_export
if job_v1.failure_reason:
v2_result["failureReason"] = job_v1.failure_reason
v2_result.update(job_v1.additional_properties)
return v2_result
def _filter_jobs(self, jobs, job_type=None, job_status=None):
if job_type:
jobs = [job for job in jobs if job["type"] == job_type]
if job_status:
jobs = [job for job in jobs if job["status"] == job_status]
return jobs