azext_iot/digitaltwins/providers/endpoint/builders.py (309 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Optional
from azure.cli.core.azclierror import CLIInternalError
from azext_iot.common.embedded_cli import EmbeddedCLI
from azext_iot.digitaltwins.common import SYSTEM_IDENTITY, ADTEndpointAuthType
from abc import ABC, abstractmethod
from knack.log import get_logger
from azext_iot.sdk.digitaltwins.controlplane.models import (
ManagedIdentityReference,
DigitalTwinsIdentityType,
EventGrid as EventGridEndpointProperties,
EventHub as EventHubEndpointProperties,
ServiceBus as ServiceBusEndpointProperties,
)
logger = get_logger(__name__)
class BaseEndpointBuilder(ABC):
def __init__(
self,
endpoint_resource_name: str,
endpoint_resource_group: str,
auth_type: str = ADTEndpointAuthType.keybased.value,
dead_letter_secret: Optional[str] = None,
dead_letter_uri: Optional[str] = None,
endpoint_subscription: Optional[str] = None,
identity: Optional[str] = None,
):
self.cli = EmbeddedCLI()
self.error_prefix = "Could not create ADT instance endpoint. Unable to retrieve"
self.endpoint_resource_name = endpoint_resource_name
self.endpoint_resource_group = endpoint_resource_group
self.endpoint_subscription = endpoint_subscription
self.auth_type = auth_type
self.dead_letter_secret = dead_letter_secret
self.dead_letter_uri = dead_letter_uri
self.identity = identity
def build_endpoint(self):
endpoint_properties = (
self.build_key_based()
if self.auth_type == ADTEndpointAuthType.keybased.value
else self.build_identity_based()
)
endpoint_properties.authentication_type = self.auth_type
if self.identity == SYSTEM_IDENTITY:
endpoint_properties.identity = ManagedIdentityReference(
type=DigitalTwinsIdentityType.system_assigned.value
)
elif self.identity is not None:
endpoint_properties.identity = ManagedIdentityReference(
type=DigitalTwinsIdentityType.user_assigned.value,
user_assigned_identity=self.identity
)
return endpoint_properties
@abstractmethod
def build_key_based(self):
pass
@abstractmethod
def build_identity_based(self):
pass
class EventGridEndpointBuilder(BaseEndpointBuilder):
def __init__(
self,
endpoint_resource_name: str,
endpoint_resource_group: str,
auth_type: str = ADTEndpointAuthType.keybased.value,
dead_letter_secret: Optional[str] = None,
dead_letter_uri: Optional[str] = None,
endpoint_subscription: Optional[str] = None,
identity: Optional[str] = None,
):
super().__init__(
endpoint_resource_name=endpoint_resource_name,
endpoint_resource_group=endpoint_resource_group,
auth_type=auth_type,
dead_letter_secret=dead_letter_secret,
dead_letter_uri=dead_letter_uri,
endpoint_subscription=endpoint_subscription,
identity=identity,
)
def build_key_based(self):
eg_topic_keys_op = self.cli.invoke(
"eventgrid topic key list --name {} -g {}".format(
self.endpoint_resource_name, self.endpoint_resource_group
),
subscription=self.endpoint_subscription,
)
if not eg_topic_keys_op.success():
raise CLIInternalError("{} Event Grid topic keys.".format(self.error_prefix))
eg_topic_keys = eg_topic_keys_op.as_json()
eg_topic_endpoint_op = self.cli.invoke(
"eventgrid topic show --name {} -g {}".format(
self.endpoint_resource_name, self.endpoint_resource_group
),
subscription=self.endpoint_subscription,
)
if not eg_topic_endpoint_op.success():
raise CLIInternalError("{} Event Grid topic endpoint.".format(self.error_prefix))
eg_topic_endpoint = eg_topic_endpoint_op.as_json()
# TODO: Potentionally have shared attributes handled by build_endpoint()
return EventGridEndpointProperties(
access_key1=eg_topic_keys["key1"],
access_key2=eg_topic_keys["key2"],
dead_letter_secret=self.dead_letter_secret,
dead_letter_uri=self.dead_letter_uri,
topic_endpoint=eg_topic_endpoint["endpoint"],
)
def build_identity_based(self):
raise CLIInternalError(
"Identity based EventGrid endpoint creation is not yet supported. "
)
class ServiceBusEndpointBuilder(BaseEndpointBuilder):
def __init__(
self,
endpoint_resource_name: str,
endpoint_resource_group: str,
endpoint_resource_namespace: str,
endpoint_resource_policy: str,
auth_type: str = ADTEndpointAuthType.keybased.value,
dead_letter_secret: Optional[str] = None,
dead_letter_uri: Optional[str] = None,
endpoint_subscription: Optional[str] = None,
identity: Optional[str] = None,
):
super().__init__(
endpoint_resource_name=endpoint_resource_name,
endpoint_resource_group=endpoint_resource_group,
auth_type=auth_type,
dead_letter_secret=dead_letter_secret,
dead_letter_uri=dead_letter_uri,
endpoint_subscription=endpoint_subscription,
identity=identity,
)
self.endpoint_resource_namespace = endpoint_resource_namespace
self.endpoint_resource_policy = endpoint_resource_policy
def build_key_based(self):
sb_topic_keys_op = self.cli.invoke(
"servicebus topic authorization-rule keys list --name {} "
"--namespace-name {} -g {} --topic-name {}".format(
self.endpoint_resource_policy,
self.endpoint_resource_namespace,
self.endpoint_resource_group,
self.endpoint_resource_name,
),
subscription=self.endpoint_subscription,
)
if not sb_topic_keys_op.success():
raise CLIInternalError("{} Service Bus topic keys.".format(self.error_prefix))
sb_topic_keys = sb_topic_keys_op.as_json()
return ServiceBusEndpointProperties(
primary_connection_string=sb_topic_keys["primaryConnectionString"],
secondary_connection_string=sb_topic_keys["secondaryConnectionString"],
dead_letter_secret=self.dead_letter_secret,
dead_letter_uri=self.dead_letter_uri,
)
def build_identity_based(self):
sb_namespace_op = self.cli.invoke(
"servicebus namespace show --name {} -g {}".format(
self.endpoint_resource_namespace,
self.endpoint_resource_group,
),
subscription=self.endpoint_subscription,
)
if not sb_namespace_op.success():
raise CLIInternalError("{} Service Bus Namespace.".format(self.error_prefix))
sb_namespace_meta = sb_namespace_op.as_json()
sb_endpoint = sb_namespace_meta["serviceBusEndpoint"]
sb_topic_op = self.cli.invoke(
"servicebus topic show --name {} --namespace {} -g {}".format(
self.endpoint_resource_name,
self.endpoint_resource_namespace,
self.endpoint_resource_group,
),
subscription=self.endpoint_subscription,
)
if not sb_topic_op.success():
raise CLIInternalError("{} Service Bus Topic.".format(self.error_prefix))
return ServiceBusEndpointProperties(
endpoint_uri=transform_sb_hostname_to_schemauri(sb_endpoint),
entity_path=self.endpoint_resource_name,
dead_letter_secret=self.dead_letter_secret,
dead_letter_uri=self.dead_letter_uri,
)
class EventHubEndpointBuilder(BaseEndpointBuilder):
def __init__(
self,
endpoint_resource_name: str,
endpoint_resource_group: str,
endpoint_resource_namespace: str,
endpoint_resource_policy: str,
auth_type: str = ADTEndpointAuthType.keybased.value,
dead_letter_secret: Optional[str] = None,
dead_letter_uri: Optional[str] = None,
endpoint_subscription: Optional[str] = None,
identity: Optional[str] = None,
):
super().__init__(
endpoint_resource_name=endpoint_resource_name,
endpoint_resource_group=endpoint_resource_group,
auth_type=auth_type,
dead_letter_secret=dead_letter_secret,
dead_letter_uri=dead_letter_uri,
endpoint_subscription=endpoint_subscription,
identity=identity,
)
self.endpoint_resource_namespace = endpoint_resource_namespace
self.endpoint_resource_policy = endpoint_resource_policy
def build_key_based(self):
eventhub_topic_keys_op = self.cli.invoke(
"eventhubs eventhub authorization-rule keys list --name {} "
"--namespace-name {} -g {} --eventhub-name {}".format(
self.endpoint_resource_policy,
self.endpoint_resource_namespace,
self.endpoint_resource_group,
self.endpoint_resource_name,
),
subscription=self.endpoint_subscription,
)
if not eventhub_topic_keys_op.success():
raise CLIInternalError("{} Event Hub keys.".format(self.error_prefix))
eventhub_topic_keys = eventhub_topic_keys_op.as_json()
return EventHubEndpointProperties(
connection_string_primary_key=eventhub_topic_keys[
"primaryConnectionString"
],
connection_string_secondary_key=eventhub_topic_keys[
"secondaryConnectionString"
],
dead_letter_secret=self.dead_letter_secret,
dead_letter_uri=self.dead_letter_uri,
)
def build_identity_based(self):
sb_namespace_op = self.cli.invoke(
"eventhubs namespace show --name {} -g {}".format(
self.endpoint_resource_namespace,
self.endpoint_resource_group,
),
subscription=self.endpoint_subscription,
)
if not sb_namespace_op.success():
raise CLIInternalError("{} EventHub Namespace.".format(self.error_prefix))
sb_namespace_meta = sb_namespace_op.as_json()
sb_endpoint = sb_namespace_meta["serviceBusEndpoint"]
sb_topic_op = self.cli.invoke(
"eventhubs eventhub show --name {} --namespace {} -g {}".format(
self.endpoint_resource_name,
self.endpoint_resource_namespace,
self.endpoint_resource_group,
),
subscription=self.endpoint_subscription,
)
if not sb_topic_op.success():
raise CLIInternalError("{} EventHub.".format(self.error_prefix))
return EventHubEndpointProperties(
endpoint_uri=transform_sb_hostname_to_schemauri(sb_endpoint),
entity_path=self.endpoint_resource_name,
dead_letter_secret=self.dead_letter_secret,
dead_letter_uri=self.dead_letter_uri,
)
def transform_sb_hostname_to_schemauri(endpoint):
from urllib.parse import urlparse
sb_endpoint_parts = urlparse(endpoint)
sb_hostname = sb_endpoint_parts.hostname
sb_schema_uri = "sb://{}/".format(sb_hostname)
return sb_schema_uri
def build_endpoint(
endpoint_resource_type: str,
endpoint_resource_name: str,
endpoint_resource_group: str,
auth_type: str = ADTEndpointAuthType.keybased.value,
endpoint_resource_namespace: Optional[str] = None,
endpoint_resource_policy: Optional[str] = None,
dead_letter_secret: Optional[str] = None,
dead_letter_uri: Optional[str] = None,
endpoint_subscription: Optional[str] = None,
identity: Optional[str] = None,
):
from azext_iot.digitaltwins.common import ADTEndpointType
if endpoint_resource_type == ADTEndpointType.eventgridtopic.value:
return EventGridEndpointBuilder(
endpoint_resource_name=endpoint_resource_name,
endpoint_resource_group=endpoint_resource_group,
auth_type=auth_type,
dead_letter_secret=dead_letter_secret,
dead_letter_uri=dead_letter_uri,
endpoint_subscription=endpoint_subscription,
identity=identity,
).build_endpoint()
if endpoint_resource_type == ADTEndpointType.servicebus.value:
return ServiceBusEndpointBuilder(
endpoint_resource_name=endpoint_resource_name,
endpoint_resource_group=endpoint_resource_group,
auth_type=auth_type,
dead_letter_secret=dead_letter_secret,
dead_letter_uri=dead_letter_uri,
endpoint_subscription=endpoint_subscription,
endpoint_resource_namespace=endpoint_resource_namespace,
endpoint_resource_policy=endpoint_resource_policy,
identity=identity,
).build_endpoint()
if endpoint_resource_type == ADTEndpointType.eventhub.value:
return EventHubEndpointBuilder(
endpoint_resource_name=endpoint_resource_name,
endpoint_resource_group=endpoint_resource_group,
auth_type=auth_type,
dead_letter_secret=dead_letter_secret,
dead_letter_uri=dead_letter_uri,
endpoint_subscription=endpoint_subscription,
endpoint_resource_namespace=endpoint_resource_namespace,
endpoint_resource_policy=endpoint_resource_policy,
identity=identity,
).build_endpoint()
raise ValueError("{} not supported.".format(endpoint_resource_type))