azext_edge/edge/providers/rpsaas/adr/assets.py (898 lines of code) (raw):

# coding=utf-8 # ---------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License file in the project root for license information. # ---------------------------------------------------------------------------------------------- import json from rich.console import Console from typing import TYPE_CHECKING, Dict, List, Iterable, Optional, Union from knack.log import get_logger from azure.cli.core.azclierror import ( InvalidArgumentValueError, RequiredArgumentMissingError, ) from .user_strings import DUPLICATE_EVENT_ERROR, DUPLICATE_POINT_ERROR, INVALID_OBSERVABILITY_MODE_ERROR from ....util import assemble_nargs_to_dict from ....common import FileType from ....util.az_client import get_registry_mgmt_client, wait_for_terminal_state, REGISTRY_API_VERSION from ....util.queryable import Queryable if TYPE_CHECKING: from ....vendor.clients.deviceregistrymgmt.operations import AssetsOperations console = Console() logger = get_logger(__name__) ASSET_RESOURCE_TYPE = "Microsoft.DeviceRegistry/assets" VALID_DATA_OBSERVABILITY_MODES = frozenset(["None", "Gauge", "Counter", "Histogram", "Log"]) VALID_EVENT_OBSERVABILITY_MODES = frozenset(["None", "Log"]) class Assets(Queryable): def __init__(self, cmd): super().__init__(cmd=cmd) self.deviceregistry_mgmt_client = get_registry_mgmt_client( subscription_id=self.default_subscription_id, api_version=REGISTRY_API_VERSION ) self.ops: "AssetsOperations" = self.deviceregistry_mgmt_client.assets def create( self, asset_name: str, endpoint_profile: str, instance_name: str, resource_group_name: str, custom_attributes: Optional[List[str]] = None, default_topic_path: Optional[str] = None, default_topic_retain: Optional[str] = None, description: Optional[str] = None, disabled: bool = False, display_name: Optional[str] = None, documentation_uri: Optional[str] = None, events: Optional[List[str]] = None, events_file_path: Optional[List[str]] = None, external_asset_id: Optional[str] = None, hardware_revision: Optional[str] = None, instance_resource_group: Optional[str] = None, instance_subscription: Optional[str] = None, location: Optional[str] = None, manufacturer: Optional[str] = None, manufacturer_uri: Optional[str] = None, model: Optional[str] = None, product_code: Optional[str] = None, serial_number: Optional[str] = None, software_revision: Optional[str] = None, ds_publishing_interval: int = 1000, ds_sampling_interval: int = 500, ds_queue_size: int = 1, ev_publishing_interval: int = 1000, ev_sampling_interval: int = 500, ev_queue_size: int = 1, tags: Optional[Dict[str, str]] = None, **kwargs ): from .helpers import get_extended_location extended_location = get_extended_location( cmd=self.cmd, instance_name=instance_name, instance_resource_group=instance_resource_group or resource_group_name, instance_subscription=instance_subscription ) cluster_location = extended_location.pop("cluster_location") # Properties properties = { "assetEndpointProfileRef": endpoint_profile, "events": _process_asset_sub_points("event_notifier", events), } if events_file_path: properties["events"].extend( _process_asset_sub_points_file_path(file_path=events_file_path) ) # Other properties _update_properties( properties, custom_attributes=custom_attributes, default_topic_path=default_topic_path, default_topic_retain=default_topic_retain, description=description, display_name=display_name, disabled=disabled, documentation_uri=documentation_uri, external_asset_id=external_asset_id, hardware_revision=hardware_revision, manufacturer=manufacturer, manufacturer_uri=manufacturer_uri, model=model, product_code=product_code, serial_number=serial_number, software_revision=software_revision, ds_publishing_interval=ds_publishing_interval, ds_sampling_interval=ds_sampling_interval, ds_queue_size=ds_queue_size, ev_publishing_interval=ev_publishing_interval, ev_sampling_interval=ev_sampling_interval, ev_queue_size=ev_queue_size, ) asset_body = { "extendedLocation": extended_location, "location": location or cluster_location, "properties": properties, "tags": tags, } with console.status(f"Creating {asset_name}..."): poller = self.ops.begin_create_or_replace( resource_group_name, asset_name, resource=asset_body ) return wait_for_terminal_state(poller, **kwargs) def delete(self, asset_name: str, resource_group_name: str, **kwargs): self.show( asset_name=asset_name, resource_group_name=resource_group_name, check_cluster=True ) with console.status(f"Deleting {asset_name}..."): poller = self.ops.begin_delete( resource_group_name, asset_name, ) return wait_for_terminal_state(poller, **kwargs) def show( self, asset_name: str, resource_group_name: str, check_cluster: bool = False ) -> dict: asset = self.ops.get( resource_group_name=resource_group_name, asset_name=asset_name ) if check_cluster: from .helpers import check_cluster_connectivity check_cluster_connectivity(self.cmd, asset) return asset def list(self, resource_group_name: Optional[str] = None) -> Iterable[dict]: if resource_group_name: return self.ops.list_by_resource_group(resource_group_name=resource_group_name) return self.ops.list_by_subscription() def query_assets( self, asset_name: Optional[str] = None, custom_query: Optional[str] = None, default_topic_path: Optional[str] = None, default_topic_retain: Optional[str] = None, description: Optional[str] = None, disabled: Optional[bool] = None, discovered: Optional[bool] = None, display_name: Optional[str] = None, documentation_uri: Optional[str] = None, endpoint_profile: Optional[str] = None, external_asset_id: Optional[str] = None, hardware_revision: Optional[str] = None, instance_name: Optional[str] = None, instance_resource_group: Optional[str] = None, location: Optional[str] = None, manufacturer: Optional[str] = None, manufacturer_uri: Optional[str] = None, model: Optional[str] = None, product_code: Optional[str] = None, resource_group_name: Optional[str] = None, serial_number: Optional[str] = None, software_revision: Optional[str] = None, ): query_body = custom_query or _build_query_body( asset_name=asset_name, default_topic_path=default_topic_path, default_topic_retain=default_topic_retain, description=description, disabled=disabled, display_name=display_name, documentation_uri=documentation_uri, endpoint_profile=endpoint_profile, external_asset_id=external_asset_id, hardware_revision=hardware_revision, location=location, manufacturer=manufacturer, manufacturer_uri=manufacturer_uri, model=model, product_code=product_code, resource_group_name=resource_group_name, serial_number=serial_number, software_revision=software_revision ) query = f"Resources | where type =~\"{ASSET_RESOURCE_TYPE}\" " + query_body if any([instance_name, instance_resource_group]): instance_query = "Resources | where type =~ 'microsoft.iotoperations/instances' " if instance_name: instance_query += f"| where name =~ \"{instance_name}\"" if instance_resource_group: instance_query += f"| where resourceGroup =~ \"{instance_resource_group}\"" # fetch the custom location + join on innerunique. Then remove the extra customLocation1 generated query = f"{instance_query} | extend customLocation = tostring(extendedLocation.name) "\ f"| project customLocation | join kind=innerunique ({query}) on customLocation "\ "| project-away customLocation1" return self.query(query=query) def update( self, asset_name: str, resource_group_name: str, custom_attributes: Optional[List[str]] = None, default_topic_path: Optional[str] = None, default_topic_retain: Optional[str] = None, description: Optional[str] = None, disabled: Optional[bool] = None, display_name: Optional[str] = None, documentation_uri: Optional[str] = None, hardware_revision: Optional[str] = None, manufacturer: Optional[str] = None, manufacturer_uri: Optional[str] = None, model: Optional[str] = None, product_code: Optional[str] = None, serial_number: Optional[str] = None, software_revision: Optional[str] = None, ds_publishing_interval: Optional[int] = None, ds_sampling_interval: Optional[int] = None, ds_queue_size: Optional[int] = None, ev_publishing_interval: Optional[int] = None, ev_sampling_interval: Optional[int] = None, ev_queue_size: Optional[int] = None, tags: Optional[Dict[str, str]] = None, **kwargs ): # get the asset original_asset = self.show( asset_name=asset_name, resource_group_name=resource_group_name, check_cluster=True ) if tags: original_asset["tags"] = tags # Other properties _update_properties( original_asset["properties"], custom_attributes=custom_attributes, default_topic_path=default_topic_path, default_topic_retain=default_topic_retain, description=description, disabled=disabled, documentation_uri=documentation_uri, display_name=display_name, hardware_revision=hardware_revision, manufacturer=manufacturer, manufacturer_uri=manufacturer_uri, model=model, product_code=product_code, serial_number=serial_number, software_revision=software_revision, ds_publishing_interval=ds_publishing_interval, ds_sampling_interval=ds_sampling_interval, ds_queue_size=ds_queue_size, ev_publishing_interval=ev_publishing_interval, ev_sampling_interval=ev_sampling_interval, ev_queue_size=ev_queue_size, ) with console.status(f"Updating {asset_name}..."): poller = self.ops.begin_create_or_replace( resource_group_name, asset_name, original_asset ) return wait_for_terminal_state(poller, **kwargs) # Dataset # TODO: multi-dataset support def list_datasets( self, asset_name: str, resource_group_name: str, ): asset = self.show( asset_name=asset_name, resource_group_name=resource_group_name, ) return asset["properties"].get("datasets", []) def show_dataset( self, asset_name: str, dataset_name: str, resource_group_name: str, ): asset = self.show( asset_name=asset_name, resource_group_name=resource_group_name ) return _get_dataset(asset, dataset_name) # Data points def add_dataset_data_point( self, asset_name: str, dataset_name: str, data_point_name: str, data_source: str, resource_group_name: str, observability_mode: Optional[str] = None, queue_size: Optional[int] = None, sampling_interval: Optional[int] = None, replace: bool = False, **kwargs ): asset = self.show( asset_name=asset_name, resource_group_name=resource_group_name, check_cluster=True ) dataset = _get_dataset(asset, dataset_name, create_if_none=True) dataset["dataPoints"] = dataset.get("dataPoints", []) point_names = [point["name"] for point in dataset["dataPoints"]] if not replace and data_point_name in point_names: raise InvalidArgumentValueError( DUPLICATE_POINT_ERROR.format(data_point_name) ) sub_point = _build_asset_sub_point( data_source=data_source, name=data_point_name, observability_mode=observability_mode, queue_size=queue_size, sampling_interval=sampling_interval, ) dataset["dataPoints"].append(sub_point) # note that update does not return the properties with console.status(f"Updating {asset_name}..."): poller = self.ops.begin_create_or_replace( resource_group_name, asset_name, asset ) asset = wait_for_terminal_state(poller, **kwargs) if not isinstance(asset, dict): asset = asset.as_dict() return _get_dataset(asset, dataset_name)["dataPoints"] def export_dataset_data_points( self, asset_name: str, dataset_name: str, resource_group_name: str, extension: str = FileType.json.value, output_dir: str = ".", replace: Optional[bool] = False ): from ....util import dump_content_to_file asset = self.show( asset_name=asset_name, resource_group_name=resource_group_name ) dataset = _get_dataset(asset, dataset_name) fieldnames = None if extension in [FileType.csv.value]: default_configuration = dataset.get("datasetConfiguration", "{}") if default_configuration == "{}": default_configuration = asset["properties"].get("defaultDatasetsConfiguration", "{}") fieldnames = _convert_sub_points_to_csv( sub_points=dataset.get("dataPoints", []), sub_point_type="dataPoints", default_configuration=default_configuration, portal_friendly=extension == FileType.csv.value ) extension = extension.replace("-", ".") file_path = dump_content_to_file( content=dataset.get("dataPoints", []), file_name=f"{asset_name}_{dataset_name}_datapoints", extension=extension, fieldnames=fieldnames, output_dir=output_dir, replace=replace ) return {"file_path": file_path} def import_dataset_data_points( self, asset_name: str, dataset_name: str, file_path: str, resource_group_name: str, replace: bool = False, **kwargs ): asset = self.show( asset_name=asset_name, resource_group_name=resource_group_name, check_cluster=True ) # should get the direct object so this should be enough dataset = _get_dataset(asset, dataset_name, create_if_none=True) dataset["dataPoints"] = _process_asset_sub_points_file_path( file_path=file_path, original_items=dataset.get("dataPoints", []), point_key="name", replace=replace ) # note that update does not return the properties with console.status(f"Updating {asset_name}..."): poller = self.ops.begin_create_or_replace( resource_group_name, asset_name, asset ) asset = wait_for_terminal_state(poller, **kwargs) if not isinstance(asset, dict): asset = asset.as_dict() return _get_dataset(asset, dataset_name)["dataPoints"] def list_dataset_data_points( self, asset_name: str, dataset_name: str, resource_group_name: str, ): dataset = self.show_dataset( asset_name=asset_name, dataset_name=dataset_name, resource_group_name=resource_group_name, ) return dataset.get("dataPoints", []) def remove_dataset_data_point( self, asset_name: str, dataset_name: str, data_point_name: str, resource_group_name: str, **kwargs, ): asset = self.show( asset_name=asset_name, resource_group_name=resource_group_name, check_cluster=True ) dataset = _get_dataset(asset, dataset_name) dataset["dataPoints"] = [dp for dp in dataset.get("dataPoints", []) if dp["name"] != data_point_name] # note that update does not return the properties with console.status(f"Updating {asset_name}..."): poller = self.ops.begin_create_or_replace( resource_group_name, asset_name, asset ) asset = wait_for_terminal_state(poller, **kwargs) if not isinstance(asset, dict): asset = asset.as_dict() return _get_dataset(asset, dataset_name)["dataPoints"] # Events def add_event( self, asset_name: str, resource_group_name: str, event_notifier: str, event_name: Optional[str] = None, observability_mode: Optional[str] = None, queue_size: Optional[int] = None, sampling_interval: Optional[int] = None, topic_path: Optional[str] = None, topic_retain: Optional[str] = None, replace: bool = False, **kwargs ): asset = self.show( asset_name=asset_name, resource_group_name=resource_group_name, check_cluster=True ) asset["properties"]["events"] = asset["properties"].get("events", []) event_names = [event["name"] for event in asset["properties"]["events"]] if not replace and event_name in event_names: raise InvalidArgumentValueError( DUPLICATE_EVENT_ERROR.format(event_name) ) sub_point = _build_asset_sub_point( event_notifier=event_notifier, name=event_name, observability_mode=observability_mode, queue_size=queue_size, sampling_interval=sampling_interval ) if topic_path: sub_point["topic"] = { "path": topic_path, "retain": topic_retain or "Never" } asset["properties"]["events"].append(sub_point) # note that update does not return the properties with console.status(f"Updating {asset_name}..."): poller = self.ops.begin_create_or_replace( resource_group_name, asset_name, asset ) asset = wait_for_terminal_state(poller, **kwargs) if not isinstance(asset, dict): asset = asset.as_dict() return asset["properties"]["events"] def export_events( self, asset_name: str, resource_group_name: str, extension: str = FileType.json.value, output_dir: str = ".", replace: Optional[bool] = False ): from ....util import dump_content_to_file asset_props = self.show( asset_name=asset_name, resource_group_name=resource_group_name, )["properties"] fieldnames = None if extension in [FileType.csv.value]: default_configuration = asset_props.get("defaultEventsConfiguration", "{}") fieldnames = _convert_sub_points_to_csv( sub_points=asset_props.get("events", []), sub_point_type="events", default_configuration=default_configuration, portal_friendly=extension == FileType.csv.value ) extension = extension.replace("-", ".") file_path = dump_content_to_file( content=asset_props.get("events", []), file_name=f"{asset_name}_events", extension=extension, fieldnames=fieldnames, output_dir=output_dir, replace=replace ) return {"file_path": file_path} def import_events( self, asset_name: str, file_path: str, resource_group_name: str, replace: bool = False, **kwargs ): asset = self.show( asset_name=asset_name, resource_group_name=resource_group_name, check_cluster=True ) asset["properties"]["events"] = _process_asset_sub_points_file_path( file_path=file_path, original_items=asset["properties"].get("events", []), point_key="name", replace=replace ) # note that update does not return the properties with console.status(f"Updating {asset_name}..."): poller = self.ops.begin_create_or_replace( resource_group_name, asset_name, asset ) asset = wait_for_terminal_state(poller, **kwargs) if not isinstance(asset, dict): asset = asset.as_dict() return asset["properties"]["events"] def list_events( self, asset_name: str, resource_group_name: str ): asset = self.show( asset_name=asset_name, resource_group_name=resource_group_name, ) return asset["properties"].get("events", []) def remove_event( self, asset_name: str, event_name: str, resource_group_name: str, **kwargs ): asset = self.show( asset_name=asset_name, resource_group_name=resource_group_name, check_cluster=True ) asset["properties"]["events"] = [ ev for ev in asset["properties"].get("events", []) if ev["name"] != event_name ] # note that update does not return the properties with console.status(f"Updating {asset_name}..."): poller = self.ops.begin_create_or_replace( resource_group_name, asset_name, asset ) asset = wait_for_terminal_state(poller, **kwargs) if not isinstance(asset, dict): asset = asset.as_dict() return asset["properties"]["events"] # New Helpers def _get_dataset(asset: dict, dataset_name: str, create_if_none: bool = False): # ensure datasets will get populated if not there asset["properties"]["datasets"] = asset["properties"].get("datasets", []) datasets = asset["properties"]["datasets"] matched_datasets = [dset for dset in datasets if dset["name"] == dataset_name] # Temporary convert empty names to default if not matched_datasets and dataset_name == "default": matched_datasets = [dset for dset in datasets if dset["name"] == ""] # create if add or import (and no datasets yet) if not matched_datasets and create_if_none: if dataset_name != "default": raise InvalidArgumentValueError("Currently only one dataset with the name default is supported.") matched_datasets = [{}] datasets.extend(matched_datasets) elif not matched_datasets: raise InvalidArgumentValueError(f"Dataset {dataset_name} not found in asset {asset['name']}.") # note: right now we can have datasets with the same name but this will not be allowed later # part of the temporary convert matched_datasets[0]["name"] = dataset_name return matched_datasets[0] def _build_topic( original_topic: Optional[Dict[str, str]] = None, topic_path: Optional[str] = None, topic_retain: Optional[str] = None ) -> Dict[str, str]: if not original_topic: original_topic = {} if topic_path: original_topic["path"] = topic_path if topic_retain: original_topic["retain"] = topic_retain elif not original_topic.get("retain"): original_topic["retain"] = "Never" if not original_topic.get("path"): raise RequiredArgumentMissingError("Topic path is needed for a topic configuration.") return original_topic def _process_asset_sub_points_file_path( file_path: str, original_items: Optional[List[dict]] = None, point_key: Optional[str] = None, replace: bool = False ) -> List[Dict[str, str]]: from ....util import deserialize_file_content file_points = list(deserialize_file_content(file_path=file_path)) _convert_sub_points_from_csv(file_points) if point_key is None: return file_points if not original_items: original_items = [] original_points = {point[point_key]: point for point in original_items} file_points = {point[point_key]: point for point in file_points} for key in file_points: if key in original_points and not replace: logger.warning(f"{key} is already present in the asset and will be ignored.") else: original_points[key] = file_points[key] return list(original_points.values()) def _build_query_body( asset_name: Optional[str] = None, default_topic_path: Optional[str] = None, default_topic_retain: Optional[str] = None, description: Optional[str] = None, disabled: Optional[bool] = None, display_name: Optional[str] = None, documentation_uri: Optional[str] = None, endpoint_profile: Optional[str] = None, external_asset_id: Optional[str] = None, hardware_revision: Optional[str] = None, location: Optional[str] = None, manufacturer: Optional[str] = None, manufacturer_uri: Optional[str] = None, model: Optional[str] = None, product_code: Optional[str] = None, resource_group_name: Optional[str] = None, serial_number: Optional[str] = None, software_revision: Optional[str] = None, ) -> str: query_body = "" if resource_group_name: query_body += f"| where resourceGroup =~ \"{resource_group_name}\"" if location: query_body += f"| where location =~ \"{location}\"" if asset_name: query_body += f"| where name =~ \"{asset_name}\"" if default_topic_path: query_body += f"| where properties.defaultTopic.path =~ \"{default_topic_path}\"" if default_topic_retain: query_body += f"| where properties.defaultTopic.retain =~ \"{default_topic_retain}\"" if description: query_body += f"| where properties.description =~ \"{description}\"" if display_name: query_body += f"| where properties.displayName =~ \"{display_name}\"" if disabled is not None: query_body += f"| where properties.enabled == {not disabled}" if documentation_uri: query_body += f"| where properties.documentationUri =~ \"{documentation_uri}\"" if endpoint_profile: query_body += f"| where properties.assetEndpointProfileUri =~ \"{endpoint_profile}\"" if external_asset_id: query_body += f"| where properties.externalAssetId =~ \"{external_asset_id}\"" if hardware_revision: query_body += f"| where properties.hardwareRevision =~ \"{hardware_revision}\"" if manufacturer: query_body += f"| where properties.manufacturer =~ \"{manufacturer}\"" if manufacturer_uri: query_body += f"| where properties.manufacturerUri =~ \"{manufacturer_uri}\"" if model: query_body += f"| where properties.model =~ \"{model}\"" if product_code: query_body += f"| where properties.productCode =~ \"{product_code}\"" if serial_number: query_body += f"| where properties.serialNumber =~ \"{serial_number}\"" if software_revision: query_body += f"| where properties.softwareRevision =~ \"{software_revision}\"" query_body += "| extend customLocation = tostring(extendedLocation.name) "\ "| extend provisioningState = properties.provisioningState "\ "| project id, customLocation, location, name, resourceGroup, provisioningState, tags, "\ "type, subscriptionId " return query_body # Helpers def _build_asset_sub_point( data_source: Optional[str] = None, event_notifier: Optional[str] = None, name: Optional[str] = None, observability_mode: Optional[str] = None, queue_size: Optional[int] = None, sampling_interval: Optional[int] = None, ) -> Dict[str, str]: custom_configuration = _build_default_configuration( original_configuration="{}", sampling_interval=sampling_interval, queue_size=queue_size ) result = {"name": name} observability_mode = observability_mode.capitalize() if observability_mode else "None" if data_source: result["dataSource"] = data_source result["dataPointConfiguration"] = custom_configuration if observability_mode not in VALID_DATA_OBSERVABILITY_MODES: raise InvalidArgumentValueError( INVALID_OBSERVABILITY_MODE_ERROR.format(data_source, ', '.join(VALID_DATA_OBSERVABILITY_MODES)) ) elif event_notifier: result["eventNotifier"] = event_notifier result["eventConfiguration"] = custom_configuration if observability_mode not in VALID_EVENT_OBSERVABILITY_MODES: raise InvalidArgumentValueError( INVALID_OBSERVABILITY_MODE_ERROR.format(event_notifier, ', '.join(VALID_EVENT_OBSERVABILITY_MODES)) ) result["observabilityMode"] = observability_mode return result def _build_default_configuration( original_configuration: str, publishing_interval: Optional[int] = None, sampling_interval: Optional[int] = None, queue_size: Optional[int] = None, ) -> str: defaults = json.loads(original_configuration) if publishing_interval: defaults["publishingInterval"] = int(publishing_interval) if sampling_interval: defaults["samplingInterval"] = int(sampling_interval) if queue_size: defaults["queueSize"] = int(queue_size) return json.dumps(defaults) def _build_ordered_csv_conversion_map(sub_point_type: str, portal_friendly: bool = False) -> Dict[str, str]: """Results in an ordered dict for headers""" from collections import OrderedDict csv_conversion_map = [ ("queueSize", "QueueSize" if portal_friendly else "Queue Size"), ("observabilityMode", "ObservabilityMode" if portal_friendly else "Observability Mode"), ] if not portal_friendly or sub_point_type == "dataPoints": csv_conversion_map.append(("samplingInterval", "Sampling Interval Milliseconds")) if not portal_friendly: csv_conversion_map.append(("capabilityId", "Capability Id")) if sub_point_type == "dataPoints": csv_conversion_map.insert(0, ("dataSource", "NodeID" if portal_friendly else "Data Source")) csv_conversion_map.insert(1, ("name", "TagName" if portal_friendly else "Name")) else: csv_conversion_map.insert(0, ("eventNotifier", "EventNotifier" if portal_friendly else "Event Notifier")) csv_conversion_map.insert(1, ("name", "EventName" if portal_friendly else "Name")) # datasource, name, queuesize, observabilitymode, samplinginterval, capabilityid return OrderedDict(csv_conversion_map) def _convert_sub_points_from_csv(sub_points: List[Dict[str, str]]): csv_conversion_map = { "CapabilityId": "capabilityId", "Capability Id": "capabilityId", "Data Source": "dataSource", "EventName": "name", "EventNotifier": "eventNotifier", "Event Notifier": "eventNotifier", "Name": "name", "NodeID": "dataSource", "ObservabilityMode": "observabilityMode", "Observability Mode": "observabilityMode", "QueueSize": "queueSize", "Queue Size": "queueSize", "Sampling Interval Milliseconds": "samplingInterval", "TagName" : "name", } for point in sub_points: # point has csv values point.pop("", None) for key, value in csv_conversion_map.items(): if key in point: point[value] = point.pop(key) # now the point has the normal values - do some final transformations if point.get("observabilityMode"): point["observabilityMode"] = point["observabilityMode"].capitalize() configuration = {} if point.get("samplingInterval"): configuration["samplingInterval"] = int(point.pop("samplingInterval")) else: point.pop("samplingInterval", None) if point.get("queueSize"): configuration["queueSize"] = int(point.pop("queueSize")) else: point.pop("queueSize", None) if configuration: config_key = "dataPointConfiguration" if "dataSource" in point else "eventConfiguration" point[config_key] = json.dumps(configuration) def _convert_sub_points_to_csv( sub_points: List[Dict[str, str]], sub_point_type: str, default_configuration: str, portal_friendly: bool = False ) -> List[str]: csv_conversion_map = _build_ordered_csv_conversion_map(sub_point_type, portal_friendly) default_configuration = json.loads(default_configuration) if portal_friendly else {} for point in sub_points: configuration = point.pop(f"{sub_point_type[:-1]}Configuration", "{}") point.update(json.loads(configuration)) if portal_friendly: point.pop("capabilityId", None) if sub_point_type == "events": point.pop("samplingInterval", None) for asset_key, csv_key in csv_conversion_map.items(): point[csv_key] = point.pop(asset_key, default_configuration.get(asset_key)) return list(csv_conversion_map.values()) def _process_asset_sub_points(required_arg: str, sub_points: Optional[List[str]]) -> List[Dict[str, str]]: """This is for the main create/update asset commands""" if not sub_points: return [] point_type = "Data point" if required_arg == "data_source" else "Event" invalid_arg = "event_notifier" if required_arg == "data_source" else "data_source" processed_points = [] for point in sub_points: parsed_points = assemble_nargs_to_dict(point) if not parsed_points.get(required_arg): raise RequiredArgumentMissingError(f"{point_type} ({point}) is missing the {required_arg}.") if parsed_points.get(invalid_arg): raise InvalidArgumentValueError(f"{point_type} does not support {invalid_arg}.") processed_point = _build_asset_sub_point(**parsed_points) processed_points.append(processed_point) return processed_points def _process_custom_attributes(current_attributes: Dict[str, str], custom_attributes: List[str]): custom_attributes = assemble_nargs_to_dict(custom_attributes) for key, value in custom_attributes.items(): if value == "": current_attributes.pop(key, None) else: current_attributes[key] = value def _update_properties( properties: Dict[str, Union[str, List[Dict[str, str]]]], custom_attributes: Optional[List[str]] = None, default_topic_path: Optional[str] = None, default_topic_retain: Optional[str] = None, description: Optional[str] = None, disabled: Optional[bool] = None, display_name: Optional[str] = None, documentation_uri: Optional[str] = None, external_asset_id: Optional[str] = None, hardware_revision: Optional[str] = None, manufacturer: Optional[str] = None, manufacturer_uri: Optional[str] = None, model: Optional[str] = None, product_code: Optional[str] = None, serial_number: Optional[str] = None, software_revision: Optional[str] = None, ds_publishing_interval: Optional[int] = None, ds_sampling_interval: Optional[int] = None, ds_queue_size: Optional[int] = None, ev_publishing_interval: Optional[int] = None, ev_sampling_interval: Optional[int] = None, ev_queue_size: Optional[int] = None, ) -> None: if description: properties["description"] = description if disabled is not None: properties["enabled"] = not disabled if display_name: properties["displayName"] = display_name if documentation_uri: properties["documentationUri"] = documentation_uri if external_asset_id: properties["externalAssetId"] = external_asset_id if hardware_revision: properties["hardwareRevision"] = hardware_revision if manufacturer: properties["manufacturer"] = manufacturer if manufacturer_uri: properties["manufacturerUri"] = manufacturer_uri if model: properties["model"] = model if product_code: properties["productCode"] = product_code if serial_number: properties["serialNumber"] = serial_number if software_revision: properties["softwareRevision"] = software_revision if custom_attributes: if "attributes" not in properties: properties["attributes"] = {} _process_custom_attributes( properties["attributes"], custom_attributes=custom_attributes ) # Defaults properties["defaultDatasetsConfiguration"] = _build_default_configuration( original_configuration=properties.get("defaultDatasetsConfiguration", "{}"), publishing_interval=ds_publishing_interval, sampling_interval=ds_sampling_interval, queue_size=ds_queue_size ) properties["defaultEventsConfiguration"] = _build_default_configuration( original_configuration=properties.get("defaultEventsConfiguration", "{}"), publishing_interval=ev_publishing_interval, sampling_interval=ev_sampling_interval, queue_size=ev_queue_size ) # TODO: unit test if any([default_topic_path, default_topic_retain]): properties["defaultTopic"] = _build_topic( original_topic=properties.get("defaultTopic"), topic_path=default_topic_path, topic_retain=default_topic_retain )