azext_iot/digitaltwins/providers/twin.py (284 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import json
from azure.cli.core.azclierror import InvalidArgumentValueError
from azext_iot.digitaltwins.providers.base import (
DigitalTwinsProvider,
ErrorResponseException,
)
from azext_iot.digitaltwins.providers.model import ModelProvider
from azext_iot.common.utility import handle_service_exception, process_json_arg
from knack.log import get_logger
logger = get_logger(__name__)
class TwinOptions():
def __init__(self, if_match=None, if_none_match=None):
self.if_match = if_match
self.if_none_match = if_none_match
self.traceparent = None
self.tracestate = None
class TwinProvider(DigitalTwinsProvider):
def __init__(self, cmd, name, rg=None):
super(TwinProvider, self).__init__(
cmd=cmd,
name=name,
rg=rg,
)
self.model_provider = ModelProvider(cmd=cmd, name=name, rg=rg)
self.query_sdk = self.get_sdk().query
self.twins_sdk = self.get_sdk().digital_twins
def invoke_query(self, query, show_cost):
from azext_iot.digitaltwins.providers.generic import accumulate_result
try:
accumulated_result, cost = accumulate_result(
self.query_sdk.query_twins,
values_name="value",
token_name="continuationToken",
token_arg_name="continuation_token",
query=query,
)
except ErrorResponseException as e:
handle_service_exception(e)
query_result = {}
query_result["result"] = accumulated_result
if show_cost:
query_result["cost"] = cost
return query_result
def create(self, twin_id, model_id, if_none_match=False, properties=None):
twin_request = {
"$dtId": twin_id,
"$metadata": {"$model": model_id},
}
if properties:
properties = process_json_arg(
content=properties, argument_name="properties"
)
twin_request.update(properties)
logger.info("Twin payload %s", json.dumps(twin_request))
try:
options = TwinOptions(if_none_match=("*" if if_none_match else None))
return self.twins_sdk.add(id=twin_id, twin=twin_request, digital_twins_add_options=options)
except ErrorResponseException as e:
handle_service_exception(e)
def get(self, twin_id):
try:
return self.twins_sdk.get_by_id(id=twin_id, raw=True).response.json()
except ErrorResponseException as e:
handle_service_exception(e)
def update(self, twin_id, json_patch, etag=None):
json_patch = process_json_arg(content=json_patch, argument_name="json-patch")
json_patch_collection = []
if isinstance(json_patch, dict):
json_patch_collection.append(json_patch)
elif isinstance(json_patch, list):
json_patch_collection.extend(json_patch)
else:
raise InvalidArgumentValueError(
f"--json-patch content must be an object or array. Actual type was: {type(json_patch).__name__}"
)
logger.info("Patch payload %s", json.dumps(json_patch_collection))
try:
options = TwinOptions(if_match=(etag if etag else "*"))
self.twins_sdk.update(
id=twin_id, patch_document=json_patch_collection, digital_twins_update_options=options, raw=True
)
return self.get(twin_id=twin_id)
except ErrorResponseException as e:
handle_service_exception(e)
def delete(self, twin_id, etag=None):
try:
options = TwinOptions(if_match=(etag if etag else "*"))
self.twins_sdk.delete(id=twin_id, digital_twins_delete_options=options, raw=True)
except ErrorResponseException as e:
handle_service_exception(e)
def delete_all(self, only_relationships=False):
# need to get all twins
query = "select * from digitaltwins"
twins = self.invoke_query(query=query, show_cost=False)["result"]
print(f"Found {len(twins)} twin(s).")
# go through and delete all
for twin in twins:
try:
self.delete_all_relationship(
twin_id=twin["$dtId"]
)
if not only_relationships:
self.delete(twin_id=twin["$dtId"])
except Exception as e:
logger.warning(f"Could not delete twin {twin['$dtId']}. The error is {e}")
def add_relationship(
self,
twin_id,
target_twin_id,
relationship_id,
relationship,
if_none_match=False,
properties=None,
):
relationship_request = {
"$targetId": target_twin_id,
"$relationshipName": relationship,
}
if properties:
properties = process_json_arg(
content=properties, argument_name="properties"
)
relationship_request.update(properties)
logger.info("Relationship payload %s", json.dumps(relationship_request))
try:
options = TwinOptions(if_none_match=("*" if if_none_match else None))
return self.twins_sdk.add_relationship(
id=twin_id,
relationship_id=relationship_id,
relationship=relationship_request,
digital_twins_add_relationship_options=options,
raw=True,
).response.json()
except ErrorResponseException as e:
handle_service_exception(e)
def get_relationship(self, twin_id, relationship_id):
try:
return self.twins_sdk.get_relationship_by_id(
id=twin_id, relationship_id=relationship_id, raw=True
).response.json()
except ErrorResponseException as e:
handle_service_exception(e)
def list_relationships(
self, twin_id, incoming_relationships=False, relationship=None
):
if not incoming_relationships:
return self.twins_sdk.list_relationships(
id=twin_id, relationship_name=relationship
)
incoming_pager = self.twins_sdk.list_incoming_relationships(id=twin_id)
incoming_result = []
try:
while True:
incoming_result.extend(incoming_pager.advance_page())
except StopIteration:
pass
except ErrorResponseException as e:
handle_service_exception(e)
if relationship:
incoming_result = [
edge
for edge in incoming_result
if edge.relationship_name and edge.relationship_name == relationship
]
return incoming_result
def update_relationship(self, twin_id, relationship_id, json_patch, etag=None):
json_patch = process_json_arg(content=json_patch, argument_name="json-patch")
json_patch_collection = []
if isinstance(json_patch, dict):
json_patch_collection.append(json_patch)
elif isinstance(json_patch, list):
json_patch_collection.extend(json_patch)
else:
raise InvalidArgumentValueError(
f"--json-patch content must be an object or array. Actual type was: {type(json_patch).__name__}"
)
logger.info("Patch payload %s", json.dumps(json_patch_collection))
try:
options = TwinOptions(if_match=(etag if etag else "*"))
self.twins_sdk.update_relationship(
id=twin_id,
relationship_id=relationship_id,
patch_document=json_patch_collection,
digital_twins_update_relationship_options=options,
)
return self.get_relationship(
twin_id=twin_id, relationship_id=relationship_id
)
except ErrorResponseException as e:
handle_service_exception(e)
def delete_relationship(self, twin_id, relationship_id, etag=None):
try:
options = TwinOptions(if_match=(etag if etag else "*"))
self.twins_sdk.delete_relationship(
id=twin_id, relationship_id=relationship_id, digital_twins_delete_relationship_options=options
)
except ErrorResponseException as e:
handle_service_exception(e)
def delete_all_relationship(self, twin_id):
relationships = self.list_relationships(twin_id, incoming_relationships=True)
incoming_pager = self.list_relationships(twin_id)
# relationships pager needs to be advanced to get relationships
try:
while True:
relationships.extend(incoming_pager.advance_page())
except StopIteration:
pass
print(f"Found {len(relationships)} relationship(s) associated with twin {twin_id}.")
for relationship in relationships:
try:
if isinstance(relationship, dict):
self.delete_relationship(
twin_id=twin_id,
relationship_id=relationship['$relationshipId']
)
else:
self.delete_relationship(
twin_id=relationship.source_id,
relationship_id=relationship.relationship_id
)
except Exception as e:
logger.warning(f"Could not delete relationship {relationship}. The error is {e}.")
def get_component(self, twin_id, component_path):
try:
return self.twins_sdk.get_component(
id=twin_id, component_path=component_path, raw=True
).response.json()
except ErrorResponseException as e:
handle_service_exception(e)
def update_component(self, twin_id, component_path, json_patch, etag=None):
json_patch = process_json_arg(content=json_patch, argument_name="json-patch")
json_patch_collection = []
if isinstance(json_patch, dict):
json_patch_collection.append(json_patch)
elif isinstance(json_patch, list):
json_patch_collection.extend(json_patch)
else:
raise InvalidArgumentValueError(
f"--json-patch content must be an object or array. Actual type was: {type(json_patch).__name__}"
)
logger.info("Patch payload %s", json.dumps(json_patch_collection))
try:
options = TwinOptions(if_match=(etag if etag else "*"))
self.twins_sdk.update_component(
id=twin_id,
component_path=component_path,
patch_document=json_patch_collection,
digital_twins_update_component_options=options,
)
return self.get_component(twin_id=twin_id, component_path=component_path)
except ErrorResponseException as e:
handle_service_exception(e)
def send_telemetry(
self, twin_id, telemetry=None, dt_id=None, component_path=None, telemetry_source_time=None
):
from uuid import uuid4
from datetime import datetime, timezone
local_time = datetime.now(timezone.utc).astimezone()
dt_timestamp = local_time.isoformat()
telemetry_request = {}
if telemetry:
telemetry = process_json_arg(content=telemetry, argument_name="telemetry")
else:
telemetry = {}
telemetry_request.update(telemetry)
logger.info("Telemetry payload: {}".format(json.dumps(telemetry_request)))
if not dt_id:
dt_id = str(uuid4())
try:
if component_path:
self.twins_sdk.send_component_telemetry(
id=twin_id,
message_id=dt_id,
dt_timestamp=dt_timestamp,
component_path=component_path,
telemetry=telemetry_request,
telemetry_source_time=telemetry_source_time
)
self.twins_sdk.send_telemetry(
id=twin_id,
message_id=dt_id,
dt_timestamp=dt_timestamp,
telemetry=telemetry_request,
telemetry_source_time=telemetry_source_time
)
except ErrorResponseException as e:
handle_service_exception(e)