ADTTools/RecreateAdtInstance/main.py (120 lines of code) (raw):
import sys
import time
from jlog import logger
from jlog import ProgressBar
from jlog import Spinner
from azure.mgmt.digitaltwins import AzureDigitalTwinsManagementClient
from azure.core.exceptions import HttpResponseError
from azure.core.exceptions import ClientAuthenticationError
from azure.mgmt.digitaltwins.v2022_05_31.models import DigitalTwinsResource
from azure.mgmt.digitaltwins.v2022_05_31.models import DigitalTwinsEndpointResource
from azure.mgmt.digitaltwins.v2022_05_31.models import EventHub
from azure.mgmt.digitaltwins.v2022_05_31.models import EventGrid
from azure.mgmt.digitaltwins.v2022_05_31.models import ServiceBus
from azure.identity import DefaultAzureCredential
def map_endpoint(resource):
props = resource.properties
match props.endpoint_type:
case "EventHub":
return EventHub(
authentication_type=props.authentication_type,
endpoint_uri=props.endpoint_uri,
entity_path=props.entity_path
)
case "EventGrid":
return EventGrid(
topic_endpoint=props.topic_endpoint,
access_key1=props.access_key1,
authentication_type=props.authentication_type
)
case "ServiceBus":
return ServiceBus(
authentication_type=props.authentication_type,
endpoint_uri=props.endpoint_uri,
entity_path=props.entity_path
)
def delete_digital_twins_instance():
delete_start = round(time.time() * 1000)
delete_instance = dt_resource_client.digital_twins.begin_delete(resource_group, instance_name)
cycles = 0
progress_bar = ProgressBar.new(total=10_000, msg="Deleting DigitalTwins Instance")
while not delete_instance.done():
if cycles < 9_900:
time.sleep(0.1)
progress_bar.update(25)
cycles += 25
delete_end = round(time.time() * 1000)
progress_bar.update(10_000 - cycles)
progress_bar.close()
return delete_end - delete_start
def create_digital_twins_instance():
logger.info("Creating new DigitalTwins instance %s", instance_name)
dt_resource = DigitalTwinsResource(location=instance.location, identity=instance.identity)
create_start = round(time.time() * 1000)
create_instance = dt_resource_client.digital_twins.begin_create_or_update(resource_group, instance_name, dt_resource)
cycles = 0
progress_bar = ProgressBar.new(total=15_000, msg="Creating DigitalTwins Instance")
while not create_instance.done():
if cycles < 14_850:
time.sleep(0.1)
progress_bar.update(25)
cycles += 25
create_end = round(time.time() * 1000)
progress_bar.update(15_000 - cycles)
progress_bar.close()
return create_end - create_start
def create_endpoints():
num_endpoints = len(endpoints)
logger.info("Recreating %d endpoint(s).", num_endpoints)
start_time = round(time.time() * 1000)
updated_endpoints = list()
for endpoint in endpoints:
updated_endpoint = dt_resource_client.digital_twins_endpoint.begin_create_or_update(
resource_group_name=resource_group,
resource_name=instance_name,
endpoint_name=endpoint.name,
endpoint_description=DigitalTwinsEndpointResource(properties=map_endpoint(endpoint))
)
updated_endpoints.append(updated_endpoint)
spinner = Spinner.new("Creating Endpoints.")
spinner.start()
num_complete = 0
while True:
for index in range(0, num_endpoints):
current_request = updated_endpoints[index]
if current_request is None:
continue
if current_request.done():
updated_endpoints[index] = None
num_complete += 1
spinner.text = f"Creating Endpoints. {num_complete}/{num_endpoints}"
if num_complete is num_endpoints:
break
time.sleep(0.1)
end_time = round(time.time() * 1000)
spinner.stop()
return end_time - start_time
if len(sys.argv) < 4:
logger.error(
'ADT instance name must be provided as arg[1]. SubscriptionId must be provided as arg[2] Exiting. ResourceGroup as arg[3]')
exit()
instance_name = sys.argv[1]
subscription_id = sys.argv[2]
resource_group = sys.argv[3]
api_version = "2022-05-31"
credential_provider = DefaultAzureCredential()
dt_resource_client = AzureDigitalTwinsManagementClient(credential_provider, subscription_id, api_version)
instance = None
try:
instance = dt_resource_client.digital_twins.get(resource_group, instance_name)
except (HttpResponseError, ClientAuthenticationError) as e:
logger.exception(e)
exit()
logger.info("Successfully retrieved %s.", instance.name)
endpoints = list(dt_resource_client.digital_twins_endpoint.list(resource_group, instance_name))
logger.info("Deleted DigitalTwins instance in %d milliseconds.", delete_digital_twins_instance())
logger.info("Created DigitalTwins instance in %d milliseconds", create_digital_twins_instance())
if len(endpoints) > 0:
logger.info("Created endpoints in %d milliseconds", create_endpoints())
logger.info("Done.")
dt_resource_client.close()