in azext_iot/monitor/event.py [0:0]
def monitor_feedback(target, device_id, wait_on_id=None, token_duration=3600):
def handle_msg(msg):
payload = next(msg.get_data())
if isinstance(payload, bytes):
payload = str(payload, "utf8")
# assume json [] based on spec
payload = json.loads(payload)
for p in payload:
if (
device_id
and p.get("deviceId")
and p["deviceId"].lower() != device_id.lower()
):
return None
print(yaml.safe_dump({"feedback": p}, default_flow_style=False), flush=True)
if wait_on_id:
msg_id = p["originalMessageId"]
if msg_id == wait_on_id:
return msg_id
return None
operation = "/messages/servicebound/feedback"
endpoint_target, token_auth = _get_endpoint_and_token_auth(
target=target, operation=operation
)
device_filter_txt = None
if device_id:
device_filter_txt = " filtering on device: {},".format(device_id)
print(
f"Starting C2D feedback monitor,{device_filter_txt if device_filter_txt else ''} use ctrl-c to stop..."
)
try:
client = uamqp.ReceiveClient(
source=endpoint_target,
auth=token_auth,
client_name=_get_container_id(),
debug=DEBUG,
)
message_generator = client.receive_messages_iter()
for msg in message_generator:
match = handle_msg(msg)
if match:
logger.info("Requested message Id has been matched...")
msg.accept()
return match
except uamqp.errors.AMQPConnectionError:
logger.debug("AMQPS connection has expired...")
finally:
client.close()