in evals/record.py [0:0]
def _flush_events_internal(self, events_to_write: Sequence[Event]):
with self._writing_lock:
try:
lines = [jsondumps(event) + "\n" for event in events_to_write]
except TypeError as e:
logger.error(f"Failed to serialize events: {events_to_write}")
raise e
idx_l = 0
while idx_l < len(events_to_write):
total_bytes = 0
idx_r = idx_l
while (
idx_r < len(events_to_write)
and total_bytes + len(lines[idx_r]) < MAX_SNOWFLAKE_BYTES
):
total_bytes += len(lines[idx_r])
idx_r += 1
assert idx_r > idx_l
start = time.time()
buffer = [
(
event.run_id,
event.event_id,
event.sample_id,
event.type,
jsondumps(event.data),
event.created_by,
event.created_at,
)
for event in events_to_write[idx_l:idx_r]
]
query = """
INSERT INTO events (run_id, event_id, sample_id, type, data, created_by, created_at)
SELECT Column1 AS run_id, Column2 as event_id, Column3 AS sample_id, Column4 AS type, PARSE_JSON(Column5) AS data, Column6 AS created_by, Column7 AS created_at
FROM VALUES(%s, %s, %s, %s, %s, %s, %s)
"""
self._conn.robust_query(command=query, seqparams=buffer, many=True)
logger.info(
f"Logged {len(buffer)} rows of events to Snowflake: insert_time={t(time.time()-start)}"
)
idx_l = idx_r
with bf.BlobFile(self.event_file_path, "ab") as f:
f.write(b"".join([line.encode("utf-8") for line in lines]))
self._last_flush_time = time.time()
self._flushes_done += 1