def monitor_feedback()

in azext_iot/monitor/event.py [0:0]


def monitor_feedback(target, device_id, wait_on_id=None, token_duration=3600):
    def handle_msg(msg):
        payload = next(msg.get_data())
        if isinstance(payload, bytes):
            payload = str(payload, "utf8")
        # assume json [] based on spec
        payload = json.loads(payload)
        for p in payload:
            if (
                device_id
                and p.get("deviceId")
                and p["deviceId"].lower() != device_id.lower()
            ):
                return None
            print(yaml.safe_dump({"feedback": p}, default_flow_style=False), flush=True)
            if wait_on_id:
                msg_id = p["originalMessageId"]
                if msg_id == wait_on_id:
                    return msg_id
        return None

    operation = "/messages/servicebound/feedback"
    endpoint_target, token_auth = _get_endpoint_and_token_auth(
        target=target, operation=operation
    )
    device_filter_txt = None
    if device_id:
        device_filter_txt = " filtering on device: {},".format(device_id)

    print(
        f"Starting C2D feedback monitor,{device_filter_txt if device_filter_txt else ''} use ctrl-c to stop..."
    )

    try:
        client = uamqp.ReceiveClient(
            source=endpoint_target,
            auth=token_auth,
            client_name=_get_container_id(),
            debug=DEBUG,
        )
        message_generator = client.receive_messages_iter()
        for msg in message_generator:
            match = handle_msg(msg)
            if match:
                logger.info("Requested message Id has been matched...")
                msg.accept()
                return match
    except uamqp.errors.AMQPConnectionError:
        logger.debug("AMQPS connection has expired...")
    finally:
        client.close()