azext_iot/dps/services/global_service.py (20 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- # This is for calls that route to the global DPS # Useful for when you don't know what the dps name is ahead of time # E.g. most IoT Central scenarios import requests from azext_iot import constants from azext_iot.dps.services import auth def get_registration_state(id_scope: str, key: str, device_id: str): """ Gets device registration state from global dps endpoint Usefule for when dps name is unknown Params: id_scope: dps id_scope key: either primary or secondary symmetric key device_id: device id that uniquely identifies the device Returns: DeviceRegistrationState: dict ProvisioningServiceErrorDetails: dict """ authToken = auth.get_dps_sas_auth_header(id_scope, device_id, key) url = "https://global.azure-devices-provisioning.net/{}/registrations/{}?api-version=2019-03-31".format( id_scope, device_id ) header_parameters = { "Content-Type": "application/json", "User-Agent": constants.USER_AGENT, "Authorization": authToken, } body = {"registrationId": "{}".format(device_id)} response = requests.post(url, headers=header_parameters, json=body) try: response.raise_for_status() return response.json() except Exception as e: return {"error": str(e), "device_id": device_id}