in src/libs/udq_helper_utils/udq_utils/udq_models.py [0:0]
def process_query(self, lambda_event):
from udq_utils.udq import SingleEntityReader, MultiEntityReader
# parse the raw lambda event into a structured IoTTwinMakerUdqRequest request object
request = IoTTwinMakerUdqRequest.parse(lambda_event)
# invoke the approriate entity reader function based on the request, or throw error if not supported
if (isinstance(request, IoTTwinMakerUDQEntityRequest)):
if isinstance(self, SingleEntityReader):
udq_response = self.entity_query(request)
else:
raise NotImplementedError(f"Received entity request but this processor ({self.__class__.__name__}) doesn't support it")
elif (isinstance(request, IoTTwinMakerUDQComponentTypeRequest)):
if isinstance(self, MultiEntityReader):
udq_response = self.component_type_query(request)
else:
raise NotImplementedError(f"Received component type request but this processor ({self.__class__.__name__}) doesn't support it")
else:
raise NotImplementedError(f"Received unknown UDQ request type: {lambda_event}")
# inline helper to marshall python native types into common IoT TwinMaker types
def serialize_value(val):
if type(val) is str:
return {
'stringValue': val
}
elif type(val) is float:
return {
'doubleValue': str(val) # Note: the UDQ interface expects string value returns instead of JSON-native types
}
elif type(val) is bool:
return {
'booleanValue': str(val)
}
else:
assert False
# marshall data rows into property values grouped by entityPropertyReference
entity_prop_ref_to_values = {}
for row in udq_response.rows:
ref = row.get_iottwinmaker_reference()
if ref not in entity_prop_ref_to_values:
entity_prop_ref_to_values[ref] = []
entity_prop_ref_to_values[ref].append({
'timestamp': int(row.get_timestamp().timestamp()),
'value': serialize_value(row.get_value())
})
# marshall the entity_prop_ref_to_values into response propertyValues structure
property_values = []
for ref in entity_prop_ref_to_values:
property_values.append({
'entityPropertyReference': ref.serialize(),
'values': entity_prop_ref_to_values[ref]
})
# marshall propertyValues and nextToken into final UDQ response
return {
'propertyValues': property_values,
'nextToken': udq_response.next_token if udq_response.next_token else None
}