azext_iot/central/services/destination.py (144 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from typing import List, Union from knack.log import get_logger from azure.cli.core.azclierror import AzureResponseError from azext_iot.constants import CENTRAL_ENDPOINT from azext_iot.central.models.v2022_06_30_preview import ( DestinationPreview, WebhookDestinationPreview, AdxDestinationPreview, ) from azext_iot.central.services import _utility from azext_iot.central.common import API_VERSION_PREVIEW logger = get_logger(__name__) BASE_PATH = "api/dataExport/destinations" def add_destination( cmd, app_id: str, destination_id: str, payload, token: str, api_version=API_VERSION_PREVIEW, central_dns_suffix=CENTRAL_ENDPOINT, ) -> Union[ DestinationPreview, WebhookDestinationPreview, AdxDestinationPreview ]: """ Add an data export destinations to IoT Central app Args: cmd: command passed into az app_id: name of app (used for forming request URL) destination_id: unique identifier of destination payload: destination JSON definition token: (OPTIONAL) authorization token to fetch device details from IoTC. MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...') central_dns_suffix: {centralDnsSuffixInPath} as found in docs Returns: Destination """ api_version = API_VERSION_PREVIEW url = "https://{}.{}/{}/{}".format( app_id, central_dns_suffix, BASE_PATH, destination_id ) return _utility.make_api_call( cmd, app_id, method="PUT", url=url, payload=payload, token=token, api_version=api_version, central_dnx_suffix=central_dns_suffix, ) def update_destination( cmd, app_id: str, destination_id: str, payload: str, token: str, api_version=API_VERSION_PREVIEW, central_dns_suffix=CENTRAL_ENDPOINT, ) -> Union[ DestinationPreview, WebhookDestinationPreview, AdxDestinationPreview ]: """ Update an data export destination in IoT Central app Args: cmd: command passed into az app_id: name of app (used for forming request URL) destination_id: unique identifier for destination payload: destination JSON definition token: (OPTIONAL) authorization token to fetch device details from IoTC. MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...') central_dns_suffix: {centralDnsSuffixInPath} as found in docs Returns: Destination """ api_version = API_VERSION_PREVIEW url = "https://{}.{}/{}/{}".format( app_id, central_dns_suffix, BASE_PATH, destination_id ) return _utility.make_api_call( cmd, app_id, method="PATCH", url=url, payload=payload, token=token, api_version=api_version, central_dnx_suffix=central_dns_suffix, ) def list_destinations( cmd, app_id: str, token: str, max_pages=0, api_version=API_VERSION_PREVIEW, central_dns_suffix=CENTRAL_ENDPOINT, ) -> List[ Union[ DestinationPreview, WebhookDestinationPreview, AdxDestinationPreview, ] ]: """ Get the list of destinations for a central app. Args: cmd: command passed into az app_id: name of app (used for forming request URL) max_pages: max return result pages token: (OPTIONAL) authorization token to fetch device details from IoTC. MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...') central_dns_suffix: {centralDnsSuffixInPath} as found in docs Returns: List of destinations """ api_version = API_VERSION_PREVIEW destinations = [] url = "https://{}.{}/{}".format(app_id, central_dns_suffix, BASE_PATH) pages_processed = 0 while (max_pages == 0 or pages_processed < max_pages) and url: result = _utility.make_api_call( cmd, app_id, method="GET", url=url, payload=None, token=token, api_version=api_version, central_dnx_suffix=central_dns_suffix, ) if "value" not in result: raise AzureResponseError("Value is not present in body: {}".format(result)) destinations.extend(result.get("value", [])) url = result.get("nextLink", None) pages_processed = pages_processed + 1 return destinations def get_destination( cmd, app_id: str, destination_id: str, token: str, api_version=API_VERSION_PREVIEW, central_dns_suffix=CENTRAL_ENDPOINT, ) -> Union[ DestinationPreview, WebhookDestinationPreview, AdxDestinationPreview ]: """ Get information about a specified destination. Args: cmd: command passed into az app_id: name of app (used for forming request URL) destination_id: unique identifier of destination token: (OPTIONAL) authorization token to fetch device details from IoTC. MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...') central_dns_suffix: {centralDnsSuffixInPath} as found in docs Returns: Destination """ api_version = API_VERSION_PREVIEW url = "https://{}.{}/{}/{}".format( app_id, central_dns_suffix, BASE_PATH, destination_id ) return _utility.make_api_call( cmd, app_id, method="GET", url=url, payload=None, token=token, api_version=api_version, central_dnx_suffix=central_dns_suffix, ) def delete_destination( cmd, app_id: str, destination_id: str, token: str, api_version=API_VERSION_PREVIEW, central_dns_suffix=CENTRAL_ENDPOINT, ) -> dict: """ Delete data export destination from Central app. Args: cmd: command passed into az app_id: name of app (used for forming request URL) destination_id: unique identifier of destination token: (OPTIONAL) authorization token to fetch device details from IoTC. MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...') central_dns_suffix: {centralDnsSuffixInPath} as found in docs Returns: response dict """ api_version = API_VERSION_PREVIEW url = "https://{}.{}/{}/{}".format( app_id, central_dns_suffix, BASE_PATH, destination_id ) return _utility.make_api_call( cmd, app_id, method="DELETE", url=url, payload=None, token=token, api_version=api_version, central_dnx_suffix=central_dns_suffix, )