azext_iot/central/commands_scheduled_job.py (148 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
# Dev note - think of this as a controller
from typing import List, Optional
from azext_iot.constants import CENTRAL_ENDPOINT
from azext_iot.central.providers import CentralScheduledJobProvider
from azext_iot.common import utility
from azext_iot.central.common import API_VERSION
from azext_iot.central.models.ga_2022_07_31 import ScheduledJobGa
def get_scheduled_job(
cmd,
app_id: str,
job_id: str,
token=None,
central_dns_suffix=CENTRAL_ENDPOINT,
api_version=API_VERSION,
) -> ScheduledJobGa:
provider = CentralScheduledJobProvider(
cmd=cmd, app_id=app_id, token=token, api_version=api_version
)
return provider.get_scheduled_job(
job_id=job_id,
central_dns_suffix=central_dns_suffix,
)
def list_scheduled_jobs(
cmd,
app_id: str,
token=None,
central_dns_suffix=CENTRAL_ENDPOINT,
api_version=API_VERSION,
) -> List[ScheduledJobGa]:
provider = CentralScheduledJobProvider(
cmd=cmd, app_id=app_id, token=token, api_version=api_version
)
return provider.list_scheduled_jobs(central_dns_suffix=central_dns_suffix)
def create_scheduled_job(
cmd,
app_id: str,
job_id: str,
group_id: str,
schedule: str,
content: str,
job_name: Optional[str] = None,
description: Optional[str] = None,
batch_type: Optional[str] = None,
threshold_type: Optional[str] = None,
threshold_batch: Optional[bool] = None,
batch: Optional[int] = None,
threshold: Optional[int] = None,
token: Optional[str] = None,
central_dns_suffix=CENTRAL_ENDPOINT,
api_version=API_VERSION,
) -> ScheduledJobGa:
payload = utility.process_json_arg(content, argument_name="content")
schedule = utility.process_json_arg(schedule, argument_name="schedule")
provider = CentralScheduledJobProvider(
cmd=cmd, app_id=app_id, api_version=api_version, token=token
)
return provider.create_scheduled_job(
job_id=job_id,
job_name=job_name,
group_id=group_id,
content=payload,
schedule=schedule,
description=description,
batch_percentage=True
if batch_type.lower() == "percentage"
else False,
threshold_percentage=True
if threshold_type.lower() == "percentage"
else False,
threshold_batch=threshold_batch,
batch=batch,
threshold=threshold,
central_dns_suffix=central_dns_suffix,
)
def update_scheduled_job(
cmd,
app_id: str,
job_id: str,
group_id: Optional[str] = None,
schedule: Optional[str] = None,
content: Optional[str] = None,
job_name: Optional[str] = None,
description: Optional[str] = None,
batch_type: Optional[str] = None,
threshold_type: Optional[str] = None,
threshold_batch: Optional[bool] = None,
batch: Optional[int] = None,
threshold: Optional[int] = None,
token: Optional[str] = None,
central_dns_suffix=CENTRAL_ENDPOINT,
api_version=API_VERSION,
) -> ScheduledJobGa:
payload = None
if content:
payload = utility.process_json_arg(content, argument_name="content")
schedule_payload = None
if schedule:
schedule_payload = utility.process_json_arg(schedule, argument_name="schedule")
provider = CentralScheduledJobProvider(
cmd=cmd, app_id=app_id, api_version=api_version, token=token
)
return provider.update_scheduled_job(
job_id=job_id,
job_name=job_name,
group_id=group_id,
content=payload,
schedule=schedule_payload,
description=description,
batch_percentage=True
if batch_type.lower() == "percentage"
else False,
threshold_percentage=True
if threshold_type.lower() == "percentage"
else False,
threshold_batch=threshold_batch,
batch=batch,
threshold=threshold,
central_dns_suffix=central_dns_suffix,
)
def delete_scheduled_job(
cmd,
app_id: str,
job_id: str,
token=None,
central_dns_suffix=CENTRAL_ENDPOINT,
api_version=API_VERSION,
) -> dict:
provider = CentralScheduledJobProvider(
cmd=cmd, app_id=app_id, token=token, api_version=api_version
)
return provider.delete_scheduled_job(
job_id=job_id,
central_dns_suffix=central_dns_suffix,
)
def list_runs(
cmd,
app_id: str,
job_id: str,
token=None,
central_dns_suffix=CENTRAL_ENDPOINT,
api_version=API_VERSION,
) -> dict:
provider = CentralScheduledJobProvider(
cmd=cmd, app_id=app_id, token=token, api_version=api_version
)
return provider.list_runs(
job_id=job_id,
central_dns_suffix=central_dns_suffix,
)