def process_query()

in src/libs/udq_helper_utils/udq_utils/udq_models.py [0:0]


    def process_query(self, lambda_event):
        from udq_utils.udq import SingleEntityReader, MultiEntityReader

        # parse the raw lambda event into a structured IoTTwinMakerUdqRequest request object
        request = IoTTwinMakerUdqRequest.parse(lambda_event)

        # invoke the approriate entity reader function based on the request, or throw error if not supported
        if (isinstance(request, IoTTwinMakerUDQEntityRequest)):
            if isinstance(self, SingleEntityReader):
                udq_response = self.entity_query(request)
            else:
                raise NotImplementedError(f"Received entity request but this processor ({self.__class__.__name__}) doesn't support it")
        elif (isinstance(request, IoTTwinMakerUDQComponentTypeRequest)):
            if isinstance(self, MultiEntityReader):
                udq_response = self.component_type_query(request)
            else:
                raise NotImplementedError(f"Received component type request but this processor ({self.__class__.__name__}) doesn't support it")
        else:
            raise NotImplementedError(f"Received unknown UDQ request type: {lambda_event}")

        # inline helper to marshall python native types into common IoT TwinMaker types
        def serialize_value(val):
            if type(val) is str:
                return {
                    'stringValue': val
                }
            elif type(val) is float:
                return {
                    'doubleValue': str(val) # Note: the UDQ interface expects string value returns instead of JSON-native types
                }
            elif type(val) is bool:
                return {
                    'booleanValue': str(val)
                }
            else:
                assert False

        # marshall data rows into property values grouped by entityPropertyReference
        entity_prop_ref_to_values = {}
        for row in udq_response.rows:
            ref = row.get_iottwinmaker_reference()
            if ref not in entity_prop_ref_to_values:
                entity_prop_ref_to_values[ref] = []
            entity_prop_ref_to_values[ref].append({
                'timestamp': int(row.get_timestamp().timestamp()),
                'value': serialize_value(row.get_value())
            })

        # marshall the entity_prop_ref_to_values into response propertyValues structure
        property_values = []
        for ref in entity_prop_ref_to_values:
            property_values.append({
                'entityPropertyReference': ref.serialize(),
                'values': entity_prop_ref_to_values[ref]
            })

        # marshall propertyValues and nextToken into final UDQ response
        return {
            'propertyValues': property_values,
            'nextToken': udq_response.next_token if udq_response.next_token else None
        }