in azext_edge/edge/providers/rpsaas/adr/assets.py [0:0]
def _process_asset_sub_points(required_arg: str, sub_points: Optional[List[str]]) -> List[Dict[str, str]]:
"""This is for the main create/update asset commands"""
if not sub_points:
return []
point_type = "Data point" if required_arg == "data_source" else "Event"
invalid_arg = "event_notifier" if required_arg == "data_source" else "data_source"
processed_points = []
for point in sub_points:
parsed_points = assemble_nargs_to_dict(point)
if not parsed_points.get(required_arg):
raise RequiredArgumentMissingError(f"{point_type} ({point}) is missing the {required_arg}.")
if parsed_points.get(invalid_arg):
raise InvalidArgumentValueError(f"{point_type} does not support {invalid_arg}.")
processed_point = _build_asset_sub_point(**parsed_points)
processed_points.append(processed_point)
return processed_points