in datastore/providers/milvus_datastore.py [0:0]
def _get_values(self, chunk: DocumentChunk) -> List[any] | None: # type: ignore
"""Convert the chunk into a list of values to insert whose indexes align with fields.
Args:
chunk (DocumentChunk): The chunk to convert.
Returns:
List (any): The values to insert.
"""
# Convert DocumentChunk and its sub models to dict
values = chunk.dict()
# Unpack the metadata into the same dict
meta = values.pop("metadata")
values.update(meta)
# Convert date to int timestamp form
if values["created_at"]:
values["created_at"] = to_unix_timestamp(values["created_at"])
# If source exists, change from Source object to the string value it holds
if values["source"]:
values["source"] = values["source"].value
# List to collect data we will return
ret = []
# Grab data responding to each field, excluding the hidden auto pk field for schema V1
offset = 1 if self._schema_ver == "V1" else 0
for key, _, default in self._get_schema()[offset:]:
# Grab the data at the key and default to our defaults set in init
x = values.get(key) or default
# If one of our required fields is missing, ignore the entire entry
if x is Required:
logger.info("Chunk " + values["id"] + " missing " + key + " skipping")
return None
# Add the corresponding value if it passes the tests
ret.append(x)
return ret