azext_edge/edge/providers/rpsaas/adr/assets.py (898 lines of code) (raw):
# coding=utf-8
# ----------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License file in the project root for license information.
# ----------------------------------------------------------------------------------------------
import json
from rich.console import Console
from typing import TYPE_CHECKING, Dict, List, Iterable, Optional, Union
from knack.log import get_logger
from azure.cli.core.azclierror import (
InvalidArgumentValueError,
RequiredArgumentMissingError,
)
from .user_strings import DUPLICATE_EVENT_ERROR, DUPLICATE_POINT_ERROR, INVALID_OBSERVABILITY_MODE_ERROR
from ....util import assemble_nargs_to_dict
from ....common import FileType
from ....util.az_client import get_registry_mgmt_client, wait_for_terminal_state, REGISTRY_API_VERSION
from ....util.queryable import Queryable
if TYPE_CHECKING:
from ....vendor.clients.deviceregistrymgmt.operations import AssetsOperations
console = Console()
logger = get_logger(__name__)
ASSET_RESOURCE_TYPE = "Microsoft.DeviceRegistry/assets"
VALID_DATA_OBSERVABILITY_MODES = frozenset(["None", "Gauge", "Counter", "Histogram", "Log"])
VALID_EVENT_OBSERVABILITY_MODES = frozenset(["None", "Log"])
class Assets(Queryable):
def __init__(self, cmd):
super().__init__(cmd=cmd)
self.deviceregistry_mgmt_client = get_registry_mgmt_client(
subscription_id=self.default_subscription_id,
api_version=REGISTRY_API_VERSION
)
self.ops: "AssetsOperations" = self.deviceregistry_mgmt_client.assets
def create(
self,
asset_name: str,
endpoint_profile: str,
instance_name: str,
resource_group_name: str,
custom_attributes: Optional[List[str]] = None,
default_topic_path: Optional[str] = None,
default_topic_retain: Optional[str] = None,
description: Optional[str] = None,
disabled: bool = False,
display_name: Optional[str] = None,
documentation_uri: Optional[str] = None,
events: Optional[List[str]] = None,
events_file_path: Optional[List[str]] = None,
external_asset_id: Optional[str] = None,
hardware_revision: Optional[str] = None,
instance_resource_group: Optional[str] = None,
instance_subscription: Optional[str] = None,
location: Optional[str] = None,
manufacturer: Optional[str] = None,
manufacturer_uri: Optional[str] = None,
model: Optional[str] = None,
product_code: Optional[str] = None,
serial_number: Optional[str] = None,
software_revision: Optional[str] = None,
ds_publishing_interval: int = 1000,
ds_sampling_interval: int = 500,
ds_queue_size: int = 1,
ev_publishing_interval: int = 1000,
ev_sampling_interval: int = 500,
ev_queue_size: int = 1,
tags: Optional[Dict[str, str]] = None,
**kwargs
):
from .helpers import get_extended_location
extended_location = get_extended_location(
cmd=self.cmd,
instance_name=instance_name,
instance_resource_group=instance_resource_group or resource_group_name,
instance_subscription=instance_subscription
)
cluster_location = extended_location.pop("cluster_location")
# Properties
properties = {
"assetEndpointProfileRef": endpoint_profile,
"events": _process_asset_sub_points("event_notifier", events),
}
if events_file_path:
properties["events"].extend(
_process_asset_sub_points_file_path(file_path=events_file_path)
)
# Other properties
_update_properties(
properties,
custom_attributes=custom_attributes,
default_topic_path=default_topic_path,
default_topic_retain=default_topic_retain,
description=description,
display_name=display_name,
disabled=disabled,
documentation_uri=documentation_uri,
external_asset_id=external_asset_id,
hardware_revision=hardware_revision,
manufacturer=manufacturer,
manufacturer_uri=manufacturer_uri,
model=model,
product_code=product_code,
serial_number=serial_number,
software_revision=software_revision,
ds_publishing_interval=ds_publishing_interval,
ds_sampling_interval=ds_sampling_interval,
ds_queue_size=ds_queue_size,
ev_publishing_interval=ev_publishing_interval,
ev_sampling_interval=ev_sampling_interval,
ev_queue_size=ev_queue_size,
)
asset_body = {
"extendedLocation": extended_location,
"location": location or cluster_location,
"properties": properties,
"tags": tags,
}
with console.status(f"Creating {asset_name}..."):
poller = self.ops.begin_create_or_replace(
resource_group_name,
asset_name,
resource=asset_body
)
return wait_for_terminal_state(poller, **kwargs)
def delete(self, asset_name: str, resource_group_name: str, **kwargs):
self.show(
asset_name=asset_name,
resource_group_name=resource_group_name,
check_cluster=True
)
with console.status(f"Deleting {asset_name}..."):
poller = self.ops.begin_delete(
resource_group_name,
asset_name,
)
return wait_for_terminal_state(poller, **kwargs)
def show(
self, asset_name: str, resource_group_name: str, check_cluster: bool = False
) -> dict:
asset = self.ops.get(
resource_group_name=resource_group_name, asset_name=asset_name
)
if check_cluster:
from .helpers import check_cluster_connectivity
check_cluster_connectivity(self.cmd, asset)
return asset
def list(self, resource_group_name: Optional[str] = None) -> Iterable[dict]:
if resource_group_name:
return self.ops.list_by_resource_group(resource_group_name=resource_group_name)
return self.ops.list_by_subscription()
def query_assets(
self,
asset_name: Optional[str] = None,
custom_query: Optional[str] = None,
default_topic_path: Optional[str] = None,
default_topic_retain: Optional[str] = None,
description: Optional[str] = None,
disabled: Optional[bool] = None,
discovered: Optional[bool] = None,
display_name: Optional[str] = None,
documentation_uri: Optional[str] = None,
endpoint_profile: Optional[str] = None,
external_asset_id: Optional[str] = None,
hardware_revision: Optional[str] = None,
instance_name: Optional[str] = None,
instance_resource_group: Optional[str] = None,
location: Optional[str] = None,
manufacturer: Optional[str] = None,
manufacturer_uri: Optional[str] = None,
model: Optional[str] = None,
product_code: Optional[str] = None,
resource_group_name: Optional[str] = None,
serial_number: Optional[str] = None,
software_revision: Optional[str] = None,
):
query_body = custom_query or _build_query_body(
asset_name=asset_name,
default_topic_path=default_topic_path,
default_topic_retain=default_topic_retain,
description=description,
disabled=disabled,
display_name=display_name,
documentation_uri=documentation_uri,
endpoint_profile=endpoint_profile,
external_asset_id=external_asset_id,
hardware_revision=hardware_revision,
location=location,
manufacturer=manufacturer,
manufacturer_uri=manufacturer_uri,
model=model,
product_code=product_code,
resource_group_name=resource_group_name,
serial_number=serial_number,
software_revision=software_revision
)
query = f"Resources | where type =~\"{ASSET_RESOURCE_TYPE}\" " + query_body
if any([instance_name, instance_resource_group]):
instance_query = "Resources | where type =~ 'microsoft.iotoperations/instances' "
if instance_name:
instance_query += f"| where name =~ \"{instance_name}\""
if instance_resource_group:
instance_query += f"| where resourceGroup =~ \"{instance_resource_group}\""
# fetch the custom location + join on innerunique. Then remove the extra customLocation1 generated
query = f"{instance_query} | extend customLocation = tostring(extendedLocation.name) "\
f"| project customLocation | join kind=innerunique ({query}) on customLocation "\
"| project-away customLocation1"
return self.query(query=query)
def update(
self,
asset_name: str,
resource_group_name: str,
custom_attributes: Optional[List[str]] = None,
default_topic_path: Optional[str] = None,
default_topic_retain: Optional[str] = None,
description: Optional[str] = None,
disabled: Optional[bool] = None,
display_name: Optional[str] = None,
documentation_uri: Optional[str] = None,
hardware_revision: Optional[str] = None,
manufacturer: Optional[str] = None,
manufacturer_uri: Optional[str] = None,
model: Optional[str] = None,
product_code: Optional[str] = None,
serial_number: Optional[str] = None,
software_revision: Optional[str] = None,
ds_publishing_interval: Optional[int] = None,
ds_sampling_interval: Optional[int] = None,
ds_queue_size: Optional[int] = None,
ev_publishing_interval: Optional[int] = None,
ev_sampling_interval: Optional[int] = None,
ev_queue_size: Optional[int] = None,
tags: Optional[Dict[str, str]] = None,
**kwargs
):
# get the asset
original_asset = self.show(
asset_name=asset_name,
resource_group_name=resource_group_name,
check_cluster=True
)
if tags:
original_asset["tags"] = tags
# Other properties
_update_properties(
original_asset["properties"],
custom_attributes=custom_attributes,
default_topic_path=default_topic_path,
default_topic_retain=default_topic_retain,
description=description,
disabled=disabled,
documentation_uri=documentation_uri,
display_name=display_name,
hardware_revision=hardware_revision,
manufacturer=manufacturer,
manufacturer_uri=manufacturer_uri,
model=model,
product_code=product_code,
serial_number=serial_number,
software_revision=software_revision,
ds_publishing_interval=ds_publishing_interval,
ds_sampling_interval=ds_sampling_interval,
ds_queue_size=ds_queue_size,
ev_publishing_interval=ev_publishing_interval,
ev_sampling_interval=ev_sampling_interval,
ev_queue_size=ev_queue_size,
)
with console.status(f"Updating {asset_name}..."):
poller = self.ops.begin_create_or_replace(
resource_group_name,
asset_name,
original_asset
)
return wait_for_terminal_state(poller, **kwargs)
# Dataset
# TODO: multi-dataset support
def list_datasets(
self,
asset_name: str,
resource_group_name: str,
):
asset = self.show(
asset_name=asset_name,
resource_group_name=resource_group_name,
)
return asset["properties"].get("datasets", [])
def show_dataset(
self,
asset_name: str,
dataset_name: str,
resource_group_name: str,
):
asset = self.show(
asset_name=asset_name,
resource_group_name=resource_group_name
)
return _get_dataset(asset, dataset_name)
# Data points
def add_dataset_data_point(
self,
asset_name: str,
dataset_name: str,
data_point_name: str,
data_source: str,
resource_group_name: str,
observability_mode: Optional[str] = None,
queue_size: Optional[int] = None,
sampling_interval: Optional[int] = None,
replace: bool = False,
**kwargs
):
asset = self.show(
asset_name=asset_name,
resource_group_name=resource_group_name,
check_cluster=True
)
dataset = _get_dataset(asset, dataset_name, create_if_none=True)
dataset["dataPoints"] = dataset.get("dataPoints", [])
point_names = [point["name"] for point in dataset["dataPoints"]]
if not replace and data_point_name in point_names:
raise InvalidArgumentValueError(
DUPLICATE_POINT_ERROR.format(data_point_name)
)
sub_point = _build_asset_sub_point(
data_source=data_source,
name=data_point_name,
observability_mode=observability_mode,
queue_size=queue_size,
sampling_interval=sampling_interval,
)
dataset["dataPoints"].append(sub_point)
# note that update does not return the properties
with console.status(f"Updating {asset_name}..."):
poller = self.ops.begin_create_or_replace(
resource_group_name,
asset_name,
asset
)
asset = wait_for_terminal_state(poller, **kwargs)
if not isinstance(asset, dict):
asset = asset.as_dict()
return _get_dataset(asset, dataset_name)["dataPoints"]
def export_dataset_data_points(
self,
asset_name: str,
dataset_name: str,
resource_group_name: str,
extension: str = FileType.json.value,
output_dir: str = ".",
replace: Optional[bool] = False
):
from ....util import dump_content_to_file
asset = self.show(
asset_name=asset_name,
resource_group_name=resource_group_name
)
dataset = _get_dataset(asset, dataset_name)
fieldnames = None
if extension in [FileType.csv.value]:
default_configuration = dataset.get("datasetConfiguration", "{}")
if default_configuration == "{}":
default_configuration = asset["properties"].get("defaultDatasetsConfiguration", "{}")
fieldnames = _convert_sub_points_to_csv(
sub_points=dataset.get("dataPoints", []),
sub_point_type="dataPoints",
default_configuration=default_configuration,
portal_friendly=extension == FileType.csv.value
)
extension = extension.replace("-", ".")
file_path = dump_content_to_file(
content=dataset.get("dataPoints", []),
file_name=f"{asset_name}_{dataset_name}_datapoints",
extension=extension,
fieldnames=fieldnames,
output_dir=output_dir,
replace=replace
)
return {"file_path": file_path}
def import_dataset_data_points(
self,
asset_name: str,
dataset_name: str,
file_path: str,
resource_group_name: str,
replace: bool = False,
**kwargs
):
asset = self.show(
asset_name=asset_name,
resource_group_name=resource_group_name,
check_cluster=True
)
# should get the direct object so this should be enough
dataset = _get_dataset(asset, dataset_name, create_if_none=True)
dataset["dataPoints"] = _process_asset_sub_points_file_path(
file_path=file_path,
original_items=dataset.get("dataPoints", []),
point_key="name",
replace=replace
)
# note that update does not return the properties
with console.status(f"Updating {asset_name}..."):
poller = self.ops.begin_create_or_replace(
resource_group_name,
asset_name,
asset
)
asset = wait_for_terminal_state(poller, **kwargs)
if not isinstance(asset, dict):
asset = asset.as_dict()
return _get_dataset(asset, dataset_name)["dataPoints"]
def list_dataset_data_points(
self,
asset_name: str,
dataset_name: str,
resource_group_name: str,
):
dataset = self.show_dataset(
asset_name=asset_name,
dataset_name=dataset_name,
resource_group_name=resource_group_name,
)
return dataset.get("dataPoints", [])
def remove_dataset_data_point(
self,
asset_name: str,
dataset_name: str,
data_point_name: str,
resource_group_name: str,
**kwargs,
):
asset = self.show(
asset_name=asset_name,
resource_group_name=resource_group_name,
check_cluster=True
)
dataset = _get_dataset(asset, dataset_name)
dataset["dataPoints"] = [dp for dp in dataset.get("dataPoints", []) if dp["name"] != data_point_name]
# note that update does not return the properties
with console.status(f"Updating {asset_name}..."):
poller = self.ops.begin_create_or_replace(
resource_group_name,
asset_name,
asset
)
asset = wait_for_terminal_state(poller, **kwargs)
if not isinstance(asset, dict):
asset = asset.as_dict()
return _get_dataset(asset, dataset_name)["dataPoints"]
# Events
def add_event(
self,
asset_name: str,
resource_group_name: str,
event_notifier: str,
event_name: Optional[str] = None,
observability_mode: Optional[str] = None,
queue_size: Optional[int] = None,
sampling_interval: Optional[int] = None,
topic_path: Optional[str] = None,
topic_retain: Optional[str] = None,
replace: bool = False,
**kwargs
):
asset = self.show(
asset_name=asset_name,
resource_group_name=resource_group_name,
check_cluster=True
)
asset["properties"]["events"] = asset["properties"].get("events", [])
event_names = [event["name"] for event in asset["properties"]["events"]]
if not replace and event_name in event_names:
raise InvalidArgumentValueError(
DUPLICATE_EVENT_ERROR.format(event_name)
)
sub_point = _build_asset_sub_point(
event_notifier=event_notifier,
name=event_name,
observability_mode=observability_mode,
queue_size=queue_size,
sampling_interval=sampling_interval
)
if topic_path:
sub_point["topic"] = {
"path": topic_path,
"retain": topic_retain or "Never"
}
asset["properties"]["events"].append(sub_point)
# note that update does not return the properties
with console.status(f"Updating {asset_name}..."):
poller = self.ops.begin_create_or_replace(
resource_group_name,
asset_name,
asset
)
asset = wait_for_terminal_state(poller, **kwargs)
if not isinstance(asset, dict):
asset = asset.as_dict()
return asset["properties"]["events"]
def export_events(
self,
asset_name: str,
resource_group_name: str,
extension: str = FileType.json.value,
output_dir: str = ".",
replace: Optional[bool] = False
):
from ....util import dump_content_to_file
asset_props = self.show(
asset_name=asset_name,
resource_group_name=resource_group_name,
)["properties"]
fieldnames = None
if extension in [FileType.csv.value]:
default_configuration = asset_props.get("defaultEventsConfiguration", "{}")
fieldnames = _convert_sub_points_to_csv(
sub_points=asset_props.get("events", []),
sub_point_type="events",
default_configuration=default_configuration,
portal_friendly=extension == FileType.csv.value
)
extension = extension.replace("-", ".")
file_path = dump_content_to_file(
content=asset_props.get("events", []),
file_name=f"{asset_name}_events",
extension=extension,
fieldnames=fieldnames,
output_dir=output_dir,
replace=replace
)
return {"file_path": file_path}
def import_events(
self,
asset_name: str,
file_path: str,
resource_group_name: str,
replace: bool = False,
**kwargs
):
asset = self.show(
asset_name=asset_name,
resource_group_name=resource_group_name,
check_cluster=True
)
asset["properties"]["events"] = _process_asset_sub_points_file_path(
file_path=file_path,
original_items=asset["properties"].get("events", []),
point_key="name",
replace=replace
)
# note that update does not return the properties
with console.status(f"Updating {asset_name}..."):
poller = self.ops.begin_create_or_replace(
resource_group_name,
asset_name,
asset
)
asset = wait_for_terminal_state(poller, **kwargs)
if not isinstance(asset, dict):
asset = asset.as_dict()
return asset["properties"]["events"]
def list_events(
self,
asset_name: str,
resource_group_name: str
):
asset = self.show(
asset_name=asset_name,
resource_group_name=resource_group_name,
)
return asset["properties"].get("events", [])
def remove_event(
self,
asset_name: str,
event_name: str,
resource_group_name: str,
**kwargs
):
asset = self.show(
asset_name=asset_name,
resource_group_name=resource_group_name,
check_cluster=True
)
asset["properties"]["events"] = [
ev for ev in asset["properties"].get("events", []) if ev["name"] != event_name
]
# note that update does not return the properties
with console.status(f"Updating {asset_name}..."):
poller = self.ops.begin_create_or_replace(
resource_group_name,
asset_name,
asset
)
asset = wait_for_terminal_state(poller, **kwargs)
if not isinstance(asset, dict):
asset = asset.as_dict()
return asset["properties"]["events"]
# New Helpers
def _get_dataset(asset: dict, dataset_name: str, create_if_none: bool = False):
# ensure datasets will get populated if not there
asset["properties"]["datasets"] = asset["properties"].get("datasets", [])
datasets = asset["properties"]["datasets"]
matched_datasets = [dset for dset in datasets if dset["name"] == dataset_name]
# Temporary convert empty names to default
if not matched_datasets and dataset_name == "default":
matched_datasets = [dset for dset in datasets if dset["name"] == ""]
# create if add or import (and no datasets yet)
if not matched_datasets and create_if_none:
if dataset_name != "default":
raise InvalidArgumentValueError("Currently only one dataset with the name default is supported.")
matched_datasets = [{}]
datasets.extend(matched_datasets)
elif not matched_datasets:
raise InvalidArgumentValueError(f"Dataset {dataset_name} not found in asset {asset['name']}.")
# note: right now we can have datasets with the same name but this will not be allowed later
# part of the temporary convert
matched_datasets[0]["name"] = dataset_name
return matched_datasets[0]
def _build_topic(
original_topic: Optional[Dict[str, str]] = None,
topic_path: Optional[str] = None,
topic_retain: Optional[str] = None
) -> Dict[str, str]:
if not original_topic:
original_topic = {}
if topic_path:
original_topic["path"] = topic_path
if topic_retain:
original_topic["retain"] = topic_retain
elif not original_topic.get("retain"):
original_topic["retain"] = "Never"
if not original_topic.get("path"):
raise RequiredArgumentMissingError("Topic path is needed for a topic configuration.")
return original_topic
def _process_asset_sub_points_file_path(
file_path: str,
original_items: Optional[List[dict]] = None,
point_key: Optional[str] = None,
replace: bool = False
) -> List[Dict[str, str]]:
from ....util import deserialize_file_content
file_points = list(deserialize_file_content(file_path=file_path))
_convert_sub_points_from_csv(file_points)
if point_key is None:
return file_points
if not original_items:
original_items = []
original_points = {point[point_key]: point for point in original_items}
file_points = {point[point_key]: point for point in file_points}
for key in file_points:
if key in original_points and not replace:
logger.warning(f"{key} is already present in the asset and will be ignored.")
else:
original_points[key] = file_points[key]
return list(original_points.values())
def _build_query_body(
asset_name: Optional[str] = None,
default_topic_path: Optional[str] = None,
default_topic_retain: Optional[str] = None,
description: Optional[str] = None,
disabled: Optional[bool] = None,
display_name: Optional[str] = None,
documentation_uri: Optional[str] = None,
endpoint_profile: Optional[str] = None,
external_asset_id: Optional[str] = None,
hardware_revision: Optional[str] = None,
location: Optional[str] = None,
manufacturer: Optional[str] = None,
manufacturer_uri: Optional[str] = None,
model: Optional[str] = None,
product_code: Optional[str] = None,
resource_group_name: Optional[str] = None,
serial_number: Optional[str] = None,
software_revision: Optional[str] = None,
) -> str:
query_body = ""
if resource_group_name:
query_body += f"| where resourceGroup =~ \"{resource_group_name}\""
if location:
query_body += f"| where location =~ \"{location}\""
if asset_name:
query_body += f"| where name =~ \"{asset_name}\""
if default_topic_path:
query_body += f"| where properties.defaultTopic.path =~ \"{default_topic_path}\""
if default_topic_retain:
query_body += f"| where properties.defaultTopic.retain =~ \"{default_topic_retain}\""
if description:
query_body += f"| where properties.description =~ \"{description}\""
if display_name:
query_body += f"| where properties.displayName =~ \"{display_name}\""
if disabled is not None:
query_body += f"| where properties.enabled == {not disabled}"
if documentation_uri:
query_body += f"| where properties.documentationUri =~ \"{documentation_uri}\""
if endpoint_profile:
query_body += f"| where properties.assetEndpointProfileUri =~ \"{endpoint_profile}\""
if external_asset_id:
query_body += f"| where properties.externalAssetId =~ \"{external_asset_id}\""
if hardware_revision:
query_body += f"| where properties.hardwareRevision =~ \"{hardware_revision}\""
if manufacturer:
query_body += f"| where properties.manufacturer =~ \"{manufacturer}\""
if manufacturer_uri:
query_body += f"| where properties.manufacturerUri =~ \"{manufacturer_uri}\""
if model:
query_body += f"| where properties.model =~ \"{model}\""
if product_code:
query_body += f"| where properties.productCode =~ \"{product_code}\""
if serial_number:
query_body += f"| where properties.serialNumber =~ \"{serial_number}\""
if software_revision:
query_body += f"| where properties.softwareRevision =~ \"{software_revision}\""
query_body += "| extend customLocation = tostring(extendedLocation.name) "\
"| extend provisioningState = properties.provisioningState "\
"| project id, customLocation, location, name, resourceGroup, provisioningState, tags, "\
"type, subscriptionId "
return query_body
# Helpers
def _build_asset_sub_point(
data_source: Optional[str] = None,
event_notifier: Optional[str] = None,
name: Optional[str] = None,
observability_mode: Optional[str] = None,
queue_size: Optional[int] = None,
sampling_interval: Optional[int] = None,
) -> Dict[str, str]:
custom_configuration = _build_default_configuration(
original_configuration="{}",
sampling_interval=sampling_interval,
queue_size=queue_size
)
result = {"name": name}
observability_mode = observability_mode.capitalize() if observability_mode else "None"
if data_source:
result["dataSource"] = data_source
result["dataPointConfiguration"] = custom_configuration
if observability_mode not in VALID_DATA_OBSERVABILITY_MODES:
raise InvalidArgumentValueError(
INVALID_OBSERVABILITY_MODE_ERROR.format(data_source, ', '.join(VALID_DATA_OBSERVABILITY_MODES))
)
elif event_notifier:
result["eventNotifier"] = event_notifier
result["eventConfiguration"] = custom_configuration
if observability_mode not in VALID_EVENT_OBSERVABILITY_MODES:
raise InvalidArgumentValueError(
INVALID_OBSERVABILITY_MODE_ERROR.format(event_notifier, ', '.join(VALID_EVENT_OBSERVABILITY_MODES))
)
result["observabilityMode"] = observability_mode
return result
def _build_default_configuration(
original_configuration: str,
publishing_interval: Optional[int] = None,
sampling_interval: Optional[int] = None,
queue_size: Optional[int] = None,
) -> str:
defaults = json.loads(original_configuration)
if publishing_interval:
defaults["publishingInterval"] = int(publishing_interval)
if sampling_interval:
defaults["samplingInterval"] = int(sampling_interval)
if queue_size:
defaults["queueSize"] = int(queue_size)
return json.dumps(defaults)
def _build_ordered_csv_conversion_map(sub_point_type: str, portal_friendly: bool = False) -> Dict[str, str]:
"""Results in an ordered dict for headers"""
from collections import OrderedDict
csv_conversion_map = [
("queueSize", "QueueSize" if portal_friendly else "Queue Size"),
("observabilityMode", "ObservabilityMode" if portal_friendly else "Observability Mode"),
]
if not portal_friendly or sub_point_type == "dataPoints":
csv_conversion_map.append(("samplingInterval", "Sampling Interval Milliseconds"))
if not portal_friendly:
csv_conversion_map.append(("capabilityId", "Capability Id"))
if sub_point_type == "dataPoints":
csv_conversion_map.insert(0, ("dataSource", "NodeID" if portal_friendly else "Data Source"))
csv_conversion_map.insert(1, ("name", "TagName" if portal_friendly else "Name"))
else:
csv_conversion_map.insert(0, ("eventNotifier", "EventNotifier" if portal_friendly else "Event Notifier"))
csv_conversion_map.insert(1, ("name", "EventName" if portal_friendly else "Name"))
# datasource, name, queuesize, observabilitymode, samplinginterval, capabilityid
return OrderedDict(csv_conversion_map)
def _convert_sub_points_from_csv(sub_points: List[Dict[str, str]]):
csv_conversion_map = {
"CapabilityId": "capabilityId",
"Capability Id": "capabilityId",
"Data Source": "dataSource",
"EventName": "name",
"EventNotifier": "eventNotifier",
"Event Notifier": "eventNotifier",
"Name": "name",
"NodeID": "dataSource",
"ObservabilityMode": "observabilityMode",
"Observability Mode": "observabilityMode",
"QueueSize": "queueSize",
"Queue Size": "queueSize",
"Sampling Interval Milliseconds": "samplingInterval",
"TagName" : "name",
}
for point in sub_points:
# point has csv values
point.pop("", None)
for key, value in csv_conversion_map.items():
if key in point:
point[value] = point.pop(key)
# now the point has the normal values - do some final transformations
if point.get("observabilityMode"):
point["observabilityMode"] = point["observabilityMode"].capitalize()
configuration = {}
if point.get("samplingInterval"):
configuration["samplingInterval"] = int(point.pop("samplingInterval"))
else:
point.pop("samplingInterval", None)
if point.get("queueSize"):
configuration["queueSize"] = int(point.pop("queueSize"))
else:
point.pop("queueSize", None)
if configuration:
config_key = "dataPointConfiguration" if "dataSource" in point else "eventConfiguration"
point[config_key] = json.dumps(configuration)
def _convert_sub_points_to_csv(
sub_points: List[Dict[str, str]],
sub_point_type: str,
default_configuration: str,
portal_friendly: bool = False
) -> List[str]:
csv_conversion_map = _build_ordered_csv_conversion_map(sub_point_type, portal_friendly)
default_configuration = json.loads(default_configuration) if portal_friendly else {}
for point in sub_points:
configuration = point.pop(f"{sub_point_type[:-1]}Configuration", "{}")
point.update(json.loads(configuration))
if portal_friendly:
point.pop("capabilityId", None)
if sub_point_type == "events":
point.pop("samplingInterval", None)
for asset_key, csv_key in csv_conversion_map.items():
point[csv_key] = point.pop(asset_key, default_configuration.get(asset_key))
return list(csv_conversion_map.values())
def _process_asset_sub_points(required_arg: str, sub_points: Optional[List[str]]) -> List[Dict[str, str]]:
"""This is for the main create/update asset commands"""
if not sub_points:
return []
point_type = "Data point" if required_arg == "data_source" else "Event"
invalid_arg = "event_notifier" if required_arg == "data_source" else "data_source"
processed_points = []
for point in sub_points:
parsed_points = assemble_nargs_to_dict(point)
if not parsed_points.get(required_arg):
raise RequiredArgumentMissingError(f"{point_type} ({point}) is missing the {required_arg}.")
if parsed_points.get(invalid_arg):
raise InvalidArgumentValueError(f"{point_type} does not support {invalid_arg}.")
processed_point = _build_asset_sub_point(**parsed_points)
processed_points.append(processed_point)
return processed_points
def _process_custom_attributes(current_attributes: Dict[str, str], custom_attributes: List[str]):
custom_attributes = assemble_nargs_to_dict(custom_attributes)
for key, value in custom_attributes.items():
if value == "":
current_attributes.pop(key, None)
else:
current_attributes[key] = value
def _update_properties(
properties: Dict[str, Union[str, List[Dict[str, str]]]],
custom_attributes: Optional[List[str]] = None,
default_topic_path: Optional[str] = None,
default_topic_retain: Optional[str] = None,
description: Optional[str] = None,
disabled: Optional[bool] = None,
display_name: Optional[str] = None,
documentation_uri: Optional[str] = None,
external_asset_id: Optional[str] = None,
hardware_revision: Optional[str] = None,
manufacturer: Optional[str] = None,
manufacturer_uri: Optional[str] = None,
model: Optional[str] = None,
product_code: Optional[str] = None,
serial_number: Optional[str] = None,
software_revision: Optional[str] = None,
ds_publishing_interval: Optional[int] = None,
ds_sampling_interval: Optional[int] = None,
ds_queue_size: Optional[int] = None,
ev_publishing_interval: Optional[int] = None,
ev_sampling_interval: Optional[int] = None,
ev_queue_size: Optional[int] = None,
) -> None:
if description:
properties["description"] = description
if disabled is not None:
properties["enabled"] = not disabled
if display_name:
properties["displayName"] = display_name
if documentation_uri:
properties["documentationUri"] = documentation_uri
if external_asset_id:
properties["externalAssetId"] = external_asset_id
if hardware_revision:
properties["hardwareRevision"] = hardware_revision
if manufacturer:
properties["manufacturer"] = manufacturer
if manufacturer_uri:
properties["manufacturerUri"] = manufacturer_uri
if model:
properties["model"] = model
if product_code:
properties["productCode"] = product_code
if serial_number:
properties["serialNumber"] = serial_number
if software_revision:
properties["softwareRevision"] = software_revision
if custom_attributes:
if "attributes" not in properties:
properties["attributes"] = {}
_process_custom_attributes(
properties["attributes"], custom_attributes=custom_attributes
)
# Defaults
properties["defaultDatasetsConfiguration"] = _build_default_configuration(
original_configuration=properties.get("defaultDatasetsConfiguration", "{}"),
publishing_interval=ds_publishing_interval,
sampling_interval=ds_sampling_interval,
queue_size=ds_queue_size
)
properties["defaultEventsConfiguration"] = _build_default_configuration(
original_configuration=properties.get("defaultEventsConfiguration", "{}"),
publishing_interval=ev_publishing_interval,
sampling_interval=ev_sampling_interval,
queue_size=ev_queue_size
)
# TODO: unit test
if any([default_topic_path, default_topic_retain]):
properties["defaultTopic"] = _build_topic(
original_topic=properties.get("defaultTopic"),
topic_path=default_topic_path,
topic_retain=default_topic_retain
)