azext_iot/central/services/file_upload.py (97 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- import requests from typing import Union from knack.log import get_logger from azext_iot.constants import CENTRAL_ENDPOINT from azext_iot.central.services import _utility from azext_iot.central.common import API_VERSION from azext_iot.central.models.ga_2022_07_31 import FileUploadGa from azure.cli.core.util import should_disable_connection_verify logger = get_logger(__name__) BASE_PATH = "api/fileUploads" MODEL = "FileUpload" def _make_call( cmd, app_id: str, method: str, token: str, api_version=API_VERSION, central_dns_suffix=CENTRAL_ENDPOINT, ) -> Union[dict, FileUploadGa]: url = "https://{}.{}/{}".format(app_id, central_dns_suffix, BASE_PATH) headers = _utility.get_headers(token, cmd) # Construct parameters query_parameters = {} query_parameters["api-version"] = api_version response = requests.request( url=url, method=method.upper(), headers=headers, params=query_parameters, verify=not should_disable_connection_verify(), ) return _utility.try_extract_result(response) def get_fileupload( cmd, app_id: str, token: str, api_version=API_VERSION, central_dns_suffix=CENTRAL_ENDPOINT, ) -> FileUploadGa: """ Get fileupload info Args: cmd: command passed into az app_id: name of app (used for forming request URL) token: (OPTIONAL) authorization token to fetch fileupload details from IoTC. MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...') central_dns_suffix: {centralDnsSuffixInPath} as found in docs Returns: fileupload: dict """ api_version = API_VERSION result = _make_call( cmd, app_id, "get", token=token, api_version=api_version, central_dns_suffix=central_dns_suffix, ) return _utility.get_object(result, MODEL, api_version) def delete_fileupload( cmd, app_id: str, token: str, api_version=API_VERSION, central_dns_suffix=CENTRAL_ENDPOINT ) -> FileUploadGa: """ Delete file upload storage configuration Args: cmd: command passed into az app_id: name of app (used for forming request URL) token: (OPTIONAL) authorization token to fetch fileupload details from IoTC. MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...') central_dns_suffix: {centralDnsSuffixInPath} as found in docs Returns: fileupload: dict """ api_version = API_VERSION result = _make_call( cmd, app_id, "delete", token=token, api_version=api_version, central_dns_suffix=central_dns_suffix, ) return result def createorupdate_fileupload( cmd, app_id: str, connection_string: str, container: str, account: str, sasTtl: bool, token: str, api_version=API_VERSION, update=False, central_dns_suffix=CENTRAL_ENDPOINT, ) -> FileUploadGa: """ Create the file upload storage account configuration. Args: cmd: command passed into az app_id: name of app (used for forming request URL) connection_string: The connection string used to configure the storage account container: The name of the container inside the storage account account: (optional) The storage account name where to upload the file to sasTtl: (optional) ISO 8601 duration standard, The amount of time the device’s request to upload a file is valid before it expires. token: (OPTIONAL) authorization token to fetch file upload details from IoTC. MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...') central_dns_suffix: {centralDnsSuffixInPath} as found in docs Returns: fileupload: dict """ api_version = API_VERSION url = "https://{}.{}/{}".format(app_id, central_dns_suffix, BASE_PATH) headers = _utility.get_headers(token, cmd, has_json_payload=True) # Construct parameters query_parameters = {} query_parameters["api-version"] = api_version payload = {} if connection_string: payload["connectionString"] = connection_string if container: payload["container"] = container if account: payload["account"] = account if sasTtl: payload["sasTtl"] = sasTtl if update: response = requests.patch( url, headers=headers, json=payload, params=query_parameters ) else: response = requests.put( url, headers=headers, json=payload, params=query_parameters ) result = _utility.try_extract_result(response) return _utility.get_object(result, MODEL, api_version)