azext_iot/digitaltwins/common.py (54 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- """ shared: Define shared data types(enums) and constant strings. """ from enum import Enum # Retry constants MAX_ADT_CREATE_RETRIES = 5 ADT_CREATE_RETRY_AFTER = 60 MAX_ADT_DH_CREATE_RETRIES = 20 # Identity params SYSTEM_IDENTITY = "[system]" LRO_TIMER = 60 # Data History strings ADX_DEFAULT_TABLE = "AdtPropertyEvents" DT_INSTANCE = "Digital Twins instance" USER_ASSIGNED_IDENTITY = "User Assigned Identity" DT_SYS_IDENTITY_ERROR = "Digital Twins instance does not have System-Assigned Identity enabled. Please enable and try again." DT_UAI_IDENTITY_ERROR = "Digital Twins instance does not have the given User Assigned Identity associated. Please add the \ identity and try again." FINISHED_CHECK_RESOURCE_LOG_MSG = "Finished checking the {0} resource." ERROR_PREFIX = "Unable to" FAIL_GENERIC_MSG = ERROR_PREFIX + " assign {0}. Please assign this role manually." FAIL_RBAC_MSG = ERROR_PREFIX + " assign {0}. Please assign this role manually with the command `az {1}`." ABORT_MSG = "Command was aborted." CONT_INPUT_MSG = "Continue with Data History connection creation anyway?" ADX_ROLE_MSG = "'Database Admin' permission on the {0} for the Azure Data Explorer database '{1}'" RBAC_ROLE_MSG = "'{0}' role on the {1} for the scope '{2}'" # Messages to be used with ADX_ROLE_MSG or RBAC_ROLE_MSG # Example: "Trying to add the '{0}' role on the Digital Twins instance for the scope '{1}'. TRY_ADD_ROLE_LOG_MSG = "Trying to add the {0}." PRESENT_ADD_ROLE_LOG_MSG = "The {0} is already present." FINISHED_ADD_ROLE_LOG_MSG = "Finished adding the {0}." ADD_ROLE_INPUT_MSG = "Add the {0}?" SKIP_ADD_ROLE_MSG = "Skipping addition of the {0}. This may prevent creation of the data history connection." # Default Event Hub Consumer Group DEFAULT_CONSUMER_GROUP = "$Default" # Models create MAX_MODELS_PER_BATCH = 30 # Enums class ADTEndpointType(Enum): """ ADT endpoint type. """ eventgridtopic = "eventgridtopic" servicebus = "servicebus" eventhub = "eventhub" class ADTEndpointAuthType(Enum): """ ADT endpoint auth type. """ identitybased = "IdentityBased" keybased = "KeyBased" class ADTPrivateConnectionStatusType(Enum): """ ADT private endpoint connection status type. """ pending = "Pending" approved = "Approved" rejected = "Rejected" disconnected = "Disconnected" class ADTPublicNetworkAccessType(Enum): """ ADT private endpoint connection status type. """ enabled = "Enabled" disabled = "Disabled" class ProvisioningStateType(Enum): """ ARM poller provisioning states """ FINISHED = frozenset(['succeeded', 'canceled', 'failed']) FAILED = frozenset(['canceled', 'failed']) SUCCEEDED = frozenset(['succeeded']) class ADTModelCreateFailurePolicy(Enum): """ Batched model creation failure policies """ ROLLBACK = "Rollback" NONE = "None" class IdentityType(Enum): """ Type of managed identity for the Digital Twin. """ system_assigned = "SystemAssigned" user_assigned = "UserAssigned" system_assigned_user_assigned = "SystemAssigned,UserAssigned" none = "None"