azext_iot/digitaltwins/commands_resource.py (290 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import List, Optional
from azext_iot.digitaltwins.commands_twins import delete_all_twin
from azext_iot.digitaltwins.commands_models import delete_all_models
from azext_iot.digitaltwins.providers.resource import ResourceProvider
from azext_iot.digitaltwins.common import (
ADX_DEFAULT_TABLE,
DEFAULT_CONSUMER_GROUP,
ADTEndpointType,
ADTEndpointAuthType,
ADTPublicNetworkAccessType,
)
from azure.cli.core.azclierror import MutuallyExclusiveArgumentError
from knack.log import get_logger
logger = get_logger(__name__)
def create_instance(
cmd,
name: str,
resource_group_name: str,
location: Optional[str] = None,
tags: Optional[List[str]] = None,
assign_identity: Optional[bool] = None,
scopes: Optional[List[str]] = None,
role_type: str = "Contributor",
public_network_access: str = ADTPublicNetworkAccessType.enabled.value,
system_identity: Optional[bool] = None,
user_identities: Optional[List[str]] = None,
no_wait: bool = False,
):
if no_wait and scopes:
raise MutuallyExclusiveArgumentError(
"Cannot assign scopes in a no wait operation. Please run the command without --no-wait."
)
rp = ResourceProvider(cmd)
return rp.create(
name=name,
resource_group_name=resource_group_name,
location=location,
tags=tags,
assign_identity=assign_identity,
scopes=scopes,
role_type=role_type,
public_network_access=public_network_access,
system_identity=system_identity,
user_identities=user_identities
)
def list_instances(cmd, resource_group_name: Optional[str] = None):
rp = ResourceProvider(cmd)
if not resource_group_name:
return rp.list()
return rp.list_by_resouce_group(resource_group_name)
def show_instance(cmd, name: str, resource_group_name: Optional[str] = None):
rp = ResourceProvider(cmd)
return rp.find_instance(name=name, resource_group_name=resource_group_name)
def delete_instance(cmd, name: str, resource_group_name: Optional[str] = None):
rp = ResourceProvider(cmd)
return rp.delete(name=name, resource_group_name=resource_group_name)
def wait_instance(cmd, name: str, resource_group_name: Optional[str] = None):
rp = ResourceProvider(cmd)
return rp.find_instance(name=name, resource_group_name=resource_group_name, wait=True)
def reset_instance(cmd, name: str, resource_group_name: Optional[str] = None):
delete_all_models(cmd, name, resource_group_name)
delete_all_twin(cmd, name, resource_group_name)
def list_endpoints(cmd, name: str, resource_group_name: Optional[str] = None):
rp = ResourceProvider(cmd)
return rp.list_endpoints(name=name, resource_group_name=resource_group_name)
def show_endpoint(cmd, name: str, endpoint_name: str, resource_group_name: Optional[str] = None):
rp = ResourceProvider(cmd)
return rp.get_endpoint(
name=name, endpoint_name=endpoint_name, resource_group_name=resource_group_name
)
def delete_endpoint(cmd, name: str, endpoint_name: str, resource_group_name: Optional[str] = None):
rp = ResourceProvider(cmd)
return rp.delete_endpoint(
name=name, endpoint_name=endpoint_name, resource_group_name=resource_group_name
)
def wait_endpoint(cmd, name: str, endpoint_name: str, resource_group_name: Optional[str] = None):
rp = ResourceProvider(cmd)
return rp.get_endpoint(
name=name,
endpoint_name=endpoint_name,
resource_group_name=resource_group_name,
wait=True
)
def add_endpoint_eventgrid(
cmd,
name: str,
endpoint_name: str,
eventgrid_topic_name: str,
eventgrid_resource_group: Optional[str] = None,
resource_group_name: Optional[str] = None,
endpoint_subscription: Optional[str] = None,
dead_letter_uri: Optional[str] = None,
dead_letter_secret: Optional[str] = None,
auth_type: str = ADTEndpointAuthType.keybased.value,
):
rp = ResourceProvider(cmd)
return rp.add_endpoint(
name=name,
resource_group_name=resource_group_name,
endpoint_name=endpoint_name,
endpoint_resource_type=ADTEndpointType.eventgridtopic.value,
endpoint_resource_name=eventgrid_topic_name,
endpoint_resource_group=eventgrid_resource_group,
endpoint_subscription=endpoint_subscription,
dead_letter_uri=dead_letter_uri,
dead_letter_secret=dead_letter_secret,
auth_type=auth_type,
)
def add_endpoint_servicebus(
cmd,
name: str,
endpoint_name: str,
servicebus_topic_name: str,
servicebus_namespace: str,
servicebus_resource_group: Optional[str] = None,
servicebus_policy: Optional[str] = None,
resource_group_name: Optional[str] = None,
endpoint_subscription: Optional[str] = None,
dead_letter_uri: Optional[str] = None,
dead_letter_secret: Optional[str] = None,
auth_type: str = ADTEndpointAuthType.keybased.value,
system_identity: Optional[bool] = None,
user_identity: Optional[List[str]] = None,
):
rp = ResourceProvider(cmd)
return rp.add_endpoint(
name=name,
resource_group_name=resource_group_name,
endpoint_name=endpoint_name,
endpoint_resource_type=ADTEndpointType.servicebus.value,
endpoint_resource_name=servicebus_topic_name,
endpoint_resource_group=servicebus_resource_group,
endpoint_resource_namespace=servicebus_namespace,
endpoint_resource_policy=servicebus_policy,
endpoint_subscription=endpoint_subscription,
dead_letter_uri=dead_letter_uri,
dead_letter_secret=dead_letter_secret,
auth_type=auth_type,
system_identity=system_identity,
user_identity=user_identity
)
def add_endpoint_eventhub(
cmd,
name: str,
endpoint_name: str,
eventhub_name: str,
eventhub_namespace: str,
eventhub_resource_group: Optional[str] = None,
eventhub_policy: Optional[str] = None,
resource_group_name: Optional[str] = None,
endpoint_subscription: Optional[str] = None,
dead_letter_uri: Optional[str] = None,
dead_letter_secret: Optional[str] = None,
auth_type: str = ADTEndpointAuthType.keybased.value,
system_identity: Optional[bool] = None,
user_identity: Optional[List[str]] = None,
):
rp = ResourceProvider(cmd)
return rp.add_endpoint(
name=name,
resource_group_name=resource_group_name,
endpoint_name=endpoint_name,
endpoint_resource_type=ADTEndpointType.eventhub.value,
endpoint_resource_name=eventhub_name,
endpoint_resource_group=eventhub_resource_group,
endpoint_resource_namespace=eventhub_namespace,
endpoint_resource_policy=eventhub_policy,
endpoint_subscription=endpoint_subscription,
dead_letter_uri=dead_letter_uri,
dead_letter_secret=dead_letter_secret,
auth_type=auth_type,
system_identity=system_identity,
user_identity=user_identity
)
def show_private_link(cmd, name: str, link_name: str, resource_group_name: Optional[str] = None):
rp = ResourceProvider(cmd)
return rp.get_private_link(
name=name, resource_group_name=resource_group_name, link_name=link_name
)
def list_private_links(cmd, name: str, resource_group_name: Optional[str] = None):
rp = ResourceProvider(cmd)
return rp.list_private_links(name=name, resource_group_name=resource_group_name)
def set_private_endpoint_conn(
cmd,
name: str,
conn_name: str,
status: str,
description: Optional[str] = None,
group_ids=None,
actions_required: Optional[str] = None,
resource_group_name: Optional[str] = None,
):
rp = ResourceProvider(cmd)
return rp.set_private_endpoint_conn(
name=name,
resource_group_name=resource_group_name,
conn_name=conn_name,
status=status,
description=description,
group_ids=group_ids,
actions_required=actions_required,
)
def show_private_endpoint_conn(cmd, name: str, conn_name: str, resource_group_name: Optional[str] = None):
rp = ResourceProvider(cmd)
return rp.get_private_endpoint_conn(
name=name, resource_group_name=resource_group_name, conn_name=conn_name
)
def list_private_endpoint_conns(cmd, name: str, resource_group_name: Optional[str] = None):
rp = ResourceProvider(cmd)
return rp.list_private_endpoint_conns(
name=name, resource_group_name=resource_group_name
)
def delete_private_endpoint_conn(cmd, name: str, conn_name: str, resource_group_name: Optional[str] = None):
rp = ResourceProvider(cmd)
return rp.delete_private_endpoint_conn(
name=name, resource_group_name=resource_group_name, conn_name=conn_name
)
def wait_private_endpoint_conn(cmd, name: str, conn_name: str, resource_group_name: Optional[str] = None):
rp = ResourceProvider(cmd)
return rp.get_private_endpoint_conn(
name=name,
resource_group_name=resource_group_name,
conn_name=conn_name,
wait=True
)
def create_adx_data_connection(
cmd,
name: str,
conn_name: str,
adx_cluster_name: str,
adx_database_name: str,
eh_namespace: str,
eh_entity_path: str,
adx_table_name: str = ADX_DEFAULT_TABLE,
adx_twin_lifecycle_events_table_name: Optional[str] = None,
adx_relationship_lifecycle_events_table_name: Optional[str] = None,
adx_resource_group: Optional[str] = None,
adx_subscription: Optional[str] = None,
eh_consumer_group: str = DEFAULT_CONSUMER_GROUP,
eh_resource_group: Optional[str] = None,
eh_subscription: Optional[str] = None,
user_identity: Optional[str] = None,
record_property_and_item_removals: bool = False,
resource_group_name: Optional[str] = None,
yes: bool = False,
):
rp = ResourceProvider(cmd)
return rp.create_adx_data_connection(
name=name,
conn_name=conn_name,
adx_cluster_name=adx_cluster_name,
adx_database_name=adx_database_name,
adx_table_name=adx_table_name,
adx_twin_lifecycle_events_table_name=adx_twin_lifecycle_events_table_name,
adx_relationship_lifecycle_events_table_name=adx_relationship_lifecycle_events_table_name,
adx_resource_group=adx_resource_group,
adx_subscription=adx_subscription,
eh_namespace=eh_namespace,
eh_entity_path=eh_entity_path,
eh_consumer_group=eh_consumer_group,
eh_resource_group=eh_resource_group,
eh_subscription=eh_subscription,
user_identity=user_identity,
record_property_and_item_removals=record_property_and_item_removals,
resource_group_name=resource_group_name,
yes=yes,
)
def show_data_connection(cmd, name: str, conn_name: str, resource_group_name: Optional[str] = None):
rp = ResourceProvider(cmd)
return rp.get_data_connection(name=name, conn_name=conn_name, resource_group_name=resource_group_name)
def wait_data_connection(cmd, name: str, conn_name: str, resource_group_name: Optional[str] = None):
rp = ResourceProvider(cmd)
return rp.get_data_connection(name=name, conn_name=conn_name, resource_group_name=resource_group_name, wait=True)
def list_data_connection(cmd, name: str, resource_group_name: Optional[str] = None):
rp = ResourceProvider(cmd)
return rp.list_data_connection(name=name, resource_group_name=resource_group_name)
def delete_data_connection(
cmd,
name: str,
conn_name: str,
resource_group_name: Optional[str] = None,
cleanup_connection_artifacts: bool = False,
):
rp = ResourceProvider(cmd)
return rp.delete_data_connection(
name=name,
conn_name=conn_name,
resource_group_name=resource_group_name,
cleanup_connection_artifacts=cleanup_connection_artifacts
)