def _get_filter()

in datastore/providers/milvus_datastore.py [0:0]


    def _get_filter(self, filter: DocumentMetadataFilter) -> Optional[str]:
        """Converts a DocumentMetdataFilter to the expression that Milvus takes.

        Args:
            filter (DocumentMetadataFilter): The Filter to convert to Milvus expression.

        Returns:
            Optional[str]: The filter if valid, otherwise None.
        """
        filters = []
        # Go through all the fields and their values
        for field, value in filter.dict().items():
            # Check if the Value is empty
            if value is not None:
                # Convert start_date to int and add greater than or equal logic
                if field == "start_date":
                    filters.append(
                        "(created_at >= " + str(to_unix_timestamp(value)) + ")"
                    )
                # Convert end_date to int and add less than or equal logic
                elif field == "end_date":
                    filters.append(
                        "(created_at <= " + str(to_unix_timestamp(value)) + ")"
                    )
                # Convert Source to its string value and check equivalency
                elif field == "source":
                    filters.append("(" + field + ' == "' + str(value.value) + '")')
                # Check equivalency of rest of string fields
                else:
                    filters.append("(" + field + ' == "' + str(value) + '")')
        # Join all our expressions with `and``
        return " and ".join(filters)