azext_iot/monitor/telemetry.py (151 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- import asyncio import sys import uamqp from uuid import uuid4 from knack.log import get_logger from typing import List from azext_iot.constants import VERSION, USER_AGENT from azext_iot.monitor.models.target import Target from azext_iot.monitor.utility import get_loop logger = get_logger(__name__) DEBUG = False def start_single_monitor( target: Target, enqueued_time_utc, on_start_string: str, on_message_received, timeout=0, ): """ :param on_message_received: A callback to process messages as they arrive from the service. It takes a single argument, a ~uamqp.message.Message object. """ return start_multiple_monitors( targets=[target], enqueued_time_utc=enqueued_time_utc, on_start_string=on_start_string, on_message_received=on_message_received, timeout=timeout, ) def start_multiple_monitors( targets: List[Target], on_start_string: str, enqueued_time_utc, on_message_received, timeout=0, ): """ :param on_message_received: A callback to process messages as they arrive from the service. It takes a single argument, a ~uamqp.message.Message object. """ coroutines = [ _initiate_event_monitor( target=target, enqueued_time_utc=enqueued_time_utc, on_message_received=on_message_received, timeout=timeout, ) for target in targets ] loop = get_loop() future = asyncio.gather(*coroutines, return_exceptions=True) result = None try: print(on_start_string, flush=True) future.add_done_callback(lambda _: _stop_and_suppress_eloop(loop)) result = loop.run_until_complete(future) except KeyboardInterrupt: print("Stopping event monitor...", flush=True) try: # TODO: remove when deprecating # pylint: disable=no-member tasks = asyncio.all_tasks(loop) for t in tasks: # pylint: disable=no-member t.cancel() loop.run_forever() except RuntimeError: pass # no running loop anymore finally: if result: errors = result[0] if errors and errors[0]: logger.debug(errors) raise RuntimeError(errors[0]) async def _initiate_event_monitor( target: Target, enqueued_time_utc, on_message_received, timeout=0 ): if not target.partitions: logger.warning("No Event Hub partitions found to listen on.") return coroutines = [] async with uamqp.ConnectionAsync( target.hostname, sasl=target.auth, debug=DEBUG, container_id=_get_container_id(), properties=_get_conn_props(), ) as conn: for p in target.partitions: coroutines.append( _monitor_events( target=target, connection=conn, partition=p, enqueued_time_utc=enqueued_time_utc, on_message_received=on_message_received, timeout=timeout, ) ) return await asyncio.gather(*coroutines, return_exceptions=True) async def _monitor_events( target: Target, connection, partition, enqueued_time_utc, on_message_received, timeout=0, ): source = uamqp.address.Source( "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format( target.hostname, target.path, target.consumer_group, partition ) ) source.set_filter( bytes( "amqp.annotation.x-opt-enqueuedtimeutc > " + str(enqueued_time_utc), "utf8" ) ) exp_cancelled = False receive_client = uamqp.ReceiveClientAsync( source, auth=target.auth, timeout=timeout, prefetch=0, client_name=_get_container_id(), debug=DEBUG, ) try: if connection: await receive_client.open_async(connection=connection) async for msg in receive_client.receive_messages_iter_async(): on_message_received(msg) except asyncio.CancelledError: exp_cancelled = True await receive_client.close_async() except uamqp.errors.LinkDetach as ld: if isinstance(ld.description, bytes): ld.description = str(ld.description, "utf8") raise RuntimeError(ld.description) except KeyboardInterrupt: logger.info("Keyboard interrupt, closing monitor on partition %s", partition) exp_cancelled = True await receive_client.close_async() raise finally: if not exp_cancelled: await receive_client.close_async() logger.info("Closed monitor on partition %s", partition) def _stop_and_suppress_eloop(loop): try: loop.stop() except Exception: pass def _get_conn_props(): return { "product": USER_AGENT, "version": VERSION, "framework": "Python {}.{}.{}".format(*sys.version_info[0:3]), "platform": sys.platform, } def _get_container_id(): return "{}/{}".format(USER_AGENT, str(uuid4()))