in azext_iot/iothub/providers/message_endpoint.py [0:0]
def _show_by_type(self, endpoint_name: str, endpoint_type: Optional[str] = None):
endpoints = self.hub_resource.properties.routing.endpoints
endpoint_list = []
if endpoint_type is None or endpoint_type.lower() == EndpointType.EventHub.value:
endpoint_list.extend(endpoints.event_hubs)
if endpoint_type is None or endpoint_type.lower() == EndpointType.ServiceBusQueue.value:
endpoint_list.extend(endpoints.service_bus_queues)
if endpoint_type is None or endpoint_type.lower() == EndpointType.ServiceBusTopic.value:
endpoint_list.extend(endpoints.service_bus_topics)
if endpoint_type is None or endpoint_type.lower() == EndpointType.AzureStorageContainer.value:
endpoint_list.extend(endpoints.storage_containers)
if (endpoint_type is None or endpoint_type.lower() == EndpointType.CosmosDBContainer.value):
if self.support_cosmos == IoTHubSDKVersion.CosmosContainers.value:
endpoint_list.extend(endpoints.cosmos_db_sql_containers)
elif self.support_cosmos == IoTHubSDKVersion.CosmosCollections.value:
endpoint_list.extend(endpoints.cosmos_db_sql_collections)
for endpoint in endpoint_list:
if endpoint.name.lower() == endpoint_name.lower():
return endpoint
if endpoint_type:
raise ResourceNotFoundError(
f"{endpoint_type} endpoint {endpoint_name} not found in IoT Hub {self.hub_resource.name}."
)
raise ResourceNotFoundError(f"Endpoint {endpoint_name} not found in IoT Hub {self.hub_resource.name}.")