in azext_iot/iothub/providers/device_messaging.py [0:0]
def _c2d_message_receive(self, lock_timeout: int = 60, ack: Optional[str] = None):
from azext_iot.constants import MESSAGING_HTTP_C2D_SYSTEM_PROPERTIES
request_headers = {}
if lock_timeout:
request_headers["IotHub-MessageLockTimeout"] = str(lock_timeout)
try:
result = self.device_sdk.device.receive_device_bound_notification(
id=self.device_id, custom_headers=request_headers, raw=True
).response
if result and result.status_code == 200:
payload = {"properties": {}}
if "etag" in result.headers:
eTag = result.headers["etag"].strip('"')
payload["etag"] = eTag
if ack:
ack_response = {}
if ack == SettleType.abandon.value:
logger.debug("__Abandoning message__")
ack_response = (
self.device_sdk.device.abandon_device_bound_notification(
id=self.device_id, etag=eTag, raw=True
)
)
elif ack == SettleType.reject.value:
logger.debug("__Rejecting message__")
ack_response = (
self.device_sdk.device.complete_device_bound_notification(
id=self.device_id, etag=eTag, reject="", raw=True
)
)
else:
logger.debug("__Completing message__")
ack_response = (
self.device_sdk.device.complete_device_bound_notification(
id=self.device_id, etag=eTag, raw=True
)
)
payload["ack"] = (
ack
if (ack_response and ack_response.response.status_code == 204)
else None
)
app_prop_prefix = "iothub-app-"
app_prop_keys = [
header
for header in result.headers
if header.lower().startswith(app_prop_prefix)
]
app_props = {}
for key in app_prop_keys:
app_props[key[len(app_prop_prefix) :]] = result.headers[key]
if app_props:
payload["properties"]["app"] = app_props
sys_props = {}
for key in MESSAGING_HTTP_C2D_SYSTEM_PROPERTIES:
if key in result.headers:
sys_props[key] = result.headers[key]
if sys_props:
payload["properties"]["system"] = sys_props
if result.content:
target_encoding = result.headers.get("content-encoding", "utf-8")
payload["data"] = NON_DECODABLE_PAYLOAD
if target_encoding in ["utf-8", "utf8", "utf-16", "utf16", "utf-32", "utf32"]:
logger.info(f"Decoding message data encoded with: {target_encoding}")
try:
payload["data"] = result.content.decode(target_encoding)
except Exception:
pass
return payload
return
except CloudError as e:
handle_service_exception(e)