in azext_iot/iothub/providers/state.py [0:0]
def upload_device_identity(self, device_id: str, identity: dict):
auth_type = identity["authentication"]["type"]
edge = identity["capabilities"]["iotEdge"]
status = identity["status"]
ptp = identity["authentication"]["x509Thumbprint"]["primaryThumbprint"]
stp = identity["authentication"]["x509Thumbprint"]["secondaryThumbprint"]
if "status_reason" in identity.keys():
status_reason = identity["statusReason"]
else:
status_reason = None
if auth_type == DeviceAuthApiType.sas.value:
pk = identity["authentication"]["symmetricKey"]["primaryKey"]
sk = identity["authentication"]["symmetricKey"]["secondaryKey"]
_iot_device_create(
target=self.target,
device_id=device_id,
edge_enabled=edge,
primary_key=pk,
secondary_key=sk,
status=status,
status_reason=status_reason
)
elif auth_type == DeviceAuthApiType.selfSigned.value:
_iot_device_create(
target=self.target,
device_id=device_id,
edge_enabled=edge,
auth_method=DeviceAuthType.x509_thumbprint.value,
primary_thumbprint=ptp,
secondary_thumbprint=stp,
status=status,
status_reason=status_reason
)
elif auth_type == DeviceAuthApiType.certificateAuthority.value:
_iot_device_create(
target=self.target,
device_id=device_id,
edge_enabled=edge,
auth_method=DeviceAuthType.x509_ca.value,
primary_thumbprint=ptp,
secondary_thumbprint=stp,
status=status,
status_reason=status_reason
)
else:
logger.error(usr_msgs.BAD_DEVICE_AUTHORIZATION_MSG.format(device_id))
_iot_device_show(target=self.target, device_id=device_id)