def _show_by_type()

in azext_iot/iothub/providers/message_endpoint.py [0:0]


    def _show_by_type(self, endpoint_name: str, endpoint_type: Optional[str] = None):
        endpoints = self.hub_resource.properties.routing.endpoints
        endpoint_list = []
        if endpoint_type is None or endpoint_type.lower() == EndpointType.EventHub.value:
            endpoint_list.extend(endpoints.event_hubs)
        if endpoint_type is None or endpoint_type.lower() == EndpointType.ServiceBusQueue.value:
            endpoint_list.extend(endpoints.service_bus_queues)
        if endpoint_type is None or endpoint_type.lower() == EndpointType.ServiceBusTopic.value:
            endpoint_list.extend(endpoints.service_bus_topics)
        if endpoint_type is None or endpoint_type.lower() == EndpointType.AzureStorageContainer.value:
            endpoint_list.extend(endpoints.storage_containers)
        if (endpoint_type is None or endpoint_type.lower() == EndpointType.CosmosDBContainer.value):
            if self.support_cosmos == IoTHubSDKVersion.CosmosContainers.value:
                endpoint_list.extend(endpoints.cosmos_db_sql_containers)
            elif self.support_cosmos == IoTHubSDKVersion.CosmosCollections.value:
                endpoint_list.extend(endpoints.cosmos_db_sql_collections)

        for endpoint in endpoint_list:
            if endpoint.name.lower() == endpoint_name.lower():
                return endpoint

        if endpoint_type:
            raise ResourceNotFoundError(
                f"{endpoint_type} endpoint {endpoint_name} not found in IoT Hub {self.hub_resource.name}."
            )

        raise ResourceNotFoundError(f"Endpoint {endpoint_name} not found in IoT Hub {self.hub_resource.name}.")