azext_iot/digitaltwins/commands_twins.py (131 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from azext_iot.digitaltwins.providers.twin import TwinProvider
from knack.log import get_logger
logger = get_logger(__name__)
def query_twins(
cmd, name_or_hostname, query_command, show_cost=False, resource_group_name=None
):
twin_provider = TwinProvider(cmd=cmd, name=name_or_hostname, rg=resource_group_name)
return twin_provider.invoke_query(query=query_command, show_cost=show_cost)
def create_twin(
cmd,
name_or_hostname,
twin_id,
model_id,
if_none_match=False,
properties=None,
resource_group_name=None
):
twin_provider = TwinProvider(cmd=cmd, name=name_or_hostname, rg=resource_group_name)
return twin_provider.create(
twin_id=twin_id, model_id=model_id, if_none_match=if_none_match, properties=properties
)
def show_twin(cmd, name_or_hostname, twin_id, resource_group_name=None):
twin_provider = TwinProvider(cmd=cmd, name=name_or_hostname, rg=resource_group_name)
return twin_provider.get(twin_id)
def update_twin(cmd, name_or_hostname, twin_id, json_patch, resource_group_name=None, etag=None):
twin_provider = TwinProvider(cmd=cmd, name=name_or_hostname, rg=resource_group_name)
return twin_provider.update(twin_id=twin_id, json_patch=json_patch, etag=etag)
def delete_twin(cmd, name_or_hostname, twin_id, resource_group_name=None, etag=None):
twin_provider = TwinProvider(cmd=cmd, name=name_or_hostname, rg=resource_group_name)
return twin_provider.delete(twin_id=twin_id, etag=etag)
def delete_all_twin(cmd, name_or_hostname, resource_group_name=None):
twin_provider = TwinProvider(cmd=cmd, name=name_or_hostname, rg=resource_group_name)
return twin_provider.delete_all()
def create_relationship(
cmd,
name_or_hostname,
twin_id,
target_twin_id,
relationship_id,
relationship,
if_none_match=False,
properties=None,
resource_group_name=None,
):
twin_provider = TwinProvider(cmd=cmd, name=name_or_hostname, rg=resource_group_name)
return twin_provider.add_relationship(
twin_id=twin_id,
target_twin_id=target_twin_id,
relationship_id=relationship_id,
relationship=relationship,
if_none_match=if_none_match,
properties=properties,
)
def show_relationship(
cmd, name_or_hostname, twin_id, relationship_id, resource_group_name=None,
):
twin_provider = TwinProvider(cmd=cmd, name=name_or_hostname, rg=resource_group_name)
return twin_provider.get_relationship(
twin_id=twin_id, relationship_id=relationship_id
)
def update_relationship(
cmd,
name_or_hostname,
twin_id,
relationship_id,
json_patch,
resource_group_name=None,
etag=None
):
twin_provider = TwinProvider(cmd=cmd, name=name_or_hostname, rg=resource_group_name)
return twin_provider.update_relationship(
twin_id=twin_id, relationship_id=relationship_id, json_patch=json_patch, etag=etag
)
def list_relationships(
cmd,
name_or_hostname,
twin_id,
incoming_relationships=False,
relationship=None,
resource_group_name=None,
):
twin_provider = TwinProvider(cmd=cmd, name=name_or_hostname, rg=resource_group_name)
return twin_provider.list_relationships(
twin_id=twin_id,
incoming_relationships=incoming_relationships,
relationship=relationship,
)
def delete_relationship(
cmd, name_or_hostname, twin_id, relationship_id, resource_group_name=None, etag=None
):
twin_provider = TwinProvider(cmd=cmd, name=name_or_hostname, rg=resource_group_name)
return twin_provider.delete_relationship(
twin_id=twin_id, relationship_id=relationship_id, etag=etag
)
def delete_all_relationship(
cmd, name_or_hostname, twin_id=None, resource_group_name=None
):
twin_provider = TwinProvider(cmd=cmd, name=name_or_hostname, rg=resource_group_name)
if twin_id:
return twin_provider.delete_all_relationship(twin_id=twin_id)
return twin_provider.delete_all(only_relationships=True)
def send_telemetry(
cmd,
name_or_hostname,
twin_id,
dt_id=None,
component_path=None,
telemetry=None,
telemetry_source_time=None,
resource_group_name=None,
):
twin_provider = TwinProvider(cmd=cmd, name=name_or_hostname, rg=resource_group_name)
return twin_provider.send_telemetry(
twin_id=twin_id,
dt_id=dt_id,
component_path=component_path,
telemetry=telemetry,
telemetry_source_time=telemetry_source_time
)
def show_component(
cmd, name_or_hostname, twin_id, component_path, resource_group_name=None
):
twin_provider = TwinProvider(cmd=cmd, name=name_or_hostname, rg=resource_group_name)
return twin_provider.get_component(twin_id=twin_id, component_path=component_path)
def update_component(
cmd, name_or_hostname, twin_id, component_path, json_patch, resource_group_name=None, etag=None
):
twin_provider = TwinProvider(cmd=cmd, name=name_or_hostname, rg=resource_group_name)
return twin_provider.update_component(
twin_id=twin_id, component_path=component_path, json_patch=json_patch, etag=etag
)