azext_iot/iothub/commands_device_messaging.py (204 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Optional
from azext_iot.iothub.providers.device_messaging import DeviceMessagingProvider
from knack.log import get_logger
logger = get_logger(__name__)
def iot_device_send_message(
cmd,
device_id: str,
data: str = "Ping from Az CLI IoT Extension",
data_file_path: Optional[str] = None,
properties: Optional[str] = None,
msg_count: int = 1,
device_symmetric_key: Optional[str] = None,
certificate_file: Optional[str] = None,
key_file: Optional[str] = None,
passphrase: Optional[str] = None,
hub_name_or_hostname: Optional[str] = None,
resource_group_name: Optional[str] = None,
login: Optional[str] = None,
model_id: Optional[str] = None,
):
messaging_provider = DeviceMessagingProvider(
cmd=cmd, device_id=device_id, hub_name=hub_name_or_hostname, rg=resource_group_name, login=login
)
return messaging_provider.device_send_message(
data=data,
data_file_path=data_file_path,
properties=properties,
msg_count=msg_count,
device_symmetric_key=device_symmetric_key,
certificate_file=certificate_file,
key_file=key_file,
passphrase=passphrase,
model_id=model_id
)
def iot_c2d_message_complete(
cmd,
device_id: str,
etag: Optional[str] = None,
hub_name_or_hostname: Optional[str] = None,
resource_group_name: Optional[str] = None,
login: Optional[str] = None,
):
messaging_provider = DeviceMessagingProvider(
cmd=cmd, device_id=device_id, hub_name=hub_name_or_hostname, rg=resource_group_name, login=login
)
return messaging_provider.c2d_message_complete(
etag=etag
)
def iot_c2d_message_reject(
cmd,
device_id: str,
etag: Optional[str] = None,
hub_name_or_hostname: Optional[str] = None,
resource_group_name: Optional[str] = None,
login: Optional[str] = None,
):
messaging_provider = DeviceMessagingProvider(
cmd=cmd, device_id=device_id, hub_name=hub_name_or_hostname, rg=resource_group_name, login=login
)
return messaging_provider.c2d_message_reject(
etag=etag
)
def iot_c2d_message_abandon(
cmd,
device_id: str,
etag: Optional[str] = None,
hub_name_or_hostname: Optional[str] = None,
resource_group_name: Optional[str] = None,
login: Optional[str] = None,
):
messaging_provider = DeviceMessagingProvider(
cmd=cmd, device_id=device_id, hub_name=hub_name_or_hostname, rg=resource_group_name, login=login
)
return messaging_provider.c2d_message_abandon(
etag=etag
)
def iot_c2d_message_receive(
cmd,
device_id: str,
lock_timeout: int = 60,
abandon: bool = False,
complete: bool = False,
reject: bool = False,
hub_name_or_hostname: Optional[str] = None,
resource_group_name: Optional[str] = None,
login: Optional[str] = None,
):
messaging_provider = DeviceMessagingProvider(
cmd=cmd, device_id=device_id, hub_name=hub_name_or_hostname, rg=resource_group_name, login=login
)
return messaging_provider.c2d_message_receive(
lock_timeout=lock_timeout, abandon=abandon, complete=complete, reject=reject
)
def iot_c2d_message_send(
cmd,
device_id: str,
data: str = "Ping from Az CLI IoT Extension",
data_file_path: Optional[str] = None,
message_id: Optional[str] = None,
correlation_id: Optional[str] = None,
user_id: Optional[str] = None,
content_encoding: str = "utf-8",
content_type: Optional[str] = None,
expiry_time_utc: Optional[str] = None,
properties: Optional[str] = None,
ack: Optional[str] = None,
wait_on_feedback: bool = False,
yes: bool = False,
repair: bool = False,
hub_name_or_hostname: Optional[str] = None,
resource_group_name: Optional[str] = None,
login: Optional[str] = None,
auth_type_dataplane: Optional[str] = None
):
from azext_iot.common.deps import ensure_uamqp
ensure_uamqp(cmd.cli_ctx.config, yes, repair)
messaging_provider = DeviceMessagingProvider(
cmd=cmd,
device_id=device_id,
hub_name=hub_name_or_hostname,
rg=resource_group_name,
login=login,
auth_type_dataplane=auth_type_dataplane
)
return messaging_provider.c2d_message_send(
data=data,
data_file_path=data_file_path,
message_id=message_id,
correlation_id=correlation_id,
user_id=user_id,
content_encoding=content_encoding,
content_type=content_type,
expiry_time_utc=expiry_time_utc,
properties=properties,
ack=ack,
wait_on_feedback=wait_on_feedback
)
def iot_c2d_message_purge(
cmd,
device_id: str,
hub_name_or_hostname: Optional[str] = None,
resource_group_name: Optional[str] = None,
login: Optional[str] = None,
):
messaging_provider = DeviceMessagingProvider(
cmd=cmd, device_id=device_id, hub_name=hub_name_or_hostname, rg=resource_group_name, login=login
)
return messaging_provider.c2d_message_purge()
def iot_simulate_device(
cmd,
device_id: str,
receive_settle: str = "complete",
data: str = "Ping from Az CLI IoT Extension",
msg_count: int = 100,
msg_interval: int = 3,
protocol_type: str = "mqtt",
properties: Optional[str] = None,
device_symmetric_key: Optional[str] = None,
certificate_file: Optional[str] = None,
key_file: Optional[str] = None,
passphrase: Optional[str] = None,
method_response_code: Optional[str] = None,
method_response_payload: Optional[str] = None,
init_reported_properties: Optional[str] = None,
hub_name_or_hostname: Optional[str] = None,
resource_group_name: Optional[str] = None,
login: Optional[str] = None,
model_id: Optional[str] = None,
):
messaging_provider = DeviceMessagingProvider(
cmd=cmd, device_id=device_id, hub_name=hub_name_or_hostname, rg=resource_group_name, login=login
)
return messaging_provider.simulate_device(
receive_settle=receive_settle,
data=data,
properties=properties,
msg_count=msg_count,
msg_interval=msg_interval,
protocol_type=protocol_type,
device_symmetric_key=device_symmetric_key,
certificate_file=certificate_file,
key_file=key_file,
passphrase=passphrase,
method_response_code=method_response_code,
method_response_payload=method_response_payload,
init_reported_properties=init_reported_properties,
model_id=model_id
)
def iot_device_upload_file(
cmd,
device_id: str,
file_path: str,
content_type: str,
hub_name_or_hostname: Optional[str] = None,
resource_group_name: Optional[str] = None,
login: Optional[str] = None,
):
messaging_provider = DeviceMessagingProvider(
cmd=cmd, device_id=device_id, hub_name=hub_name_or_hostname, rg=resource_group_name, login=login
)
return messaging_provider.device_upload_file(
file_path=file_path,
content_type=content_type,
)