in src/libs/udq_helper_utils/udq_utils/udq_models.py [0:0]
def __init__(self, event):
self._event = event
self._udq_context = {
'workspace_id': IoTTwinMakerUdqRequest.get_required_field(event, 'workspaceId'),
'properties': self._event['properties']
}
self._entityId = event.get('entityId')
self._componentName = event.get('componentName')
# entityId and componentName must both appear if at all
if (self._entityId and not self._componentName) or (not self._entityId and self._componentName):
raise Exception("EntityId and componentName must show up together")
self._componentTypeId = event.get('componentTypeId')
self._selectedProperties = IoTTwinMakerUdqRequest.get_required_field(event, 'selectedProperties')
# validate the selected properties
# verify each selected property is in the properties map from the event
allowed_props = self._udq_context['properties'].keys()
if(len(self._selectedProperties) < 1):
raise Exception('Unexpected selectedProperties[{}]'.format(self._selectedProperties))
for selectedProperty in self._selectedProperties:
if selectedProperty not in allowed_props and selectedProperty != 'alarm_status':
raise Exception(f"selectedProperty: {selectedProperty} not found in entity/component definition. Allowed properties: {allowed_props}")
def get_required_datetime_field(event, field_name):
time_in_sec = IoTTwinMakerUdqRequest.get_required_field(event, field_name)
try:
return datetime.utcfromtimestamp(time_in_sec)
except:
raise Exception(f"Timestamp[{time_in_sec}] could not be converted to IS8601")
self._startDateTime = get_required_datetime_field(self._event, 'startDateTime')
self._endDateTime = get_required_datetime_field(self._event, 'endDateTime')
self._nextToken = event.get('nextToken')
self._maxRows = event.get('maxResults')
def get_order_by(event):
orderByTime = event.get('orderByTime')
if not orderByTime or orderByTime == 'ASCENDING':
return OrderBy.ASCENDING
elif orderByTime == 'DESCENDING':
return OrderBy.DESCENDING
else:
raise Exception(f"Unsupported OrderBy type: [{orderByTime}]")
self._orderBy = get_order_by(self._event)
self._property_filters = self._event.get('propertyFilters', [])