in datastore/providers/milvus_datastore.py [0:0]
def _get_filter(self, filter: DocumentMetadataFilter) -> Optional[str]:
"""Converts a DocumentMetdataFilter to the expression that Milvus takes.
Args:
filter (DocumentMetadataFilter): The Filter to convert to Milvus expression.
Returns:
Optional[str]: The filter if valid, otherwise None.
"""
filters = []
# Go through all the fields and their values
for field, value in filter.dict().items():
# Check if the Value is empty
if value is not None:
# Convert start_date to int and add greater than or equal logic
if field == "start_date":
filters.append(
"(created_at >= " + str(to_unix_timestamp(value)) + ")"
)
# Convert end_date to int and add less than or equal logic
elif field == "end_date":
filters.append(
"(created_at <= " + str(to_unix_timestamp(value)) + ")"
)
# Convert Source to its string value and check equivalency
elif field == "source":
filters.append("(" + field + ' == "' + str(value.value) + '")')
# Check equivalency of rest of string fields
else:
filters.append("(" + field + ' == "' + str(value) + '")')
# Join all our expressions with `and``
return " and ".join(filters)