in experiments/arena/config/spanner_config.py [0:0]
def upsert_study_runs(self, study_runs: list[ArenaModelEvaluation], table_name: Optional[str] = "Study"):
"""Adds or updates a list of study runs in the Spanner database."""
inserts = []
updates = []
current_timestamp = spanner.COMMIT_TIMESTAMP
for study_run in study_runs:
is_insert = False
if not study_run.id:
study_run.id = self._generate_unique_id()
is_insert = True
if not study_run.time_of_rating:
study_run.time_of_rating = current_timestamp
is_insert = True
log("Setting time_of_rating to commit timestamp as it was not provided.")
columns = [field.name for field in fields(ArenaModelEvaluation)]
values = []
for field in fields(ArenaModelEvaluation):
value = getattr(study_run, field.name)
if isinstance(value, datetime):
value = value.isoformat()
if isinstance(value, Enum):
value = str(value)
values.append(value)
if is_insert:
inserts.append(values)
else:
updates.append(values)
try:
with self.database.batch() as batch:
if inserts:
log(f"Inserting {len(inserts)} new study runs into the database.")
batch.insert(table_name, columns=columns, values=inserts)
if updates:
log(f"Updating {len(updates)} existing study runs in the database.")
batch.update(table_name, columns=columns, values=updates)
log(f"{len(study_runs)} study runs added/updated successfully in the database.")
except Exception as e:
raise Exception(f"Error adding study runs: {e}") from e
finally:
self._close_connection()