in jobs/mongodb_migration/src/mongodb_migration/migrations/_20240221160700_cache_merge_split_first_rows.py [0:0]
def up(self) -> None:
db = get_db(CACHE_MONGOENGINE_ALIAS)
logging.info(
"Remove all the entries with 'error_code=ResponseAlreadyComputedError' for 'split-first-rows-from-streaming'"
)
db[CACHE_COLLECTION_RESPONSES].delete_many(
{
"kind": {"$in": [STREAMING, PARQUET]},
"error_code": "ResponseAlreadyComputedError",
}
)
logging.info("Update or delete all the 'split-first-rows-from-parquet' responses")
for parquet_entry in db[CACHE_COLLECTION_RESPONSES].find({"kind": PARQUET}):
streaming_entry = db[CACHE_COLLECTION_RESPONSES].find_one(
{
"kind": STREAMING,
"dataset": parquet_entry["dataset"],
"config": parquet_entry["config"],
"split": parquet_entry["split"],
}
)
if streaming_entry is None:
db[CACHE_COLLECTION_RESPONSES].update_one(
{"_id": parquet_entry["_id"]}, {"$set": {"kind": MERGED, "job_runner_version": JOB_RUNNER_VERSION}}
)
elif parquet_entry["http_status"] == 200:
db[CACHE_COLLECTION_RESPONSES].update_one(
{"_id": parquet_entry["_id"]}, {"$set": {"kind": MERGED, "job_runner_version": JOB_RUNNER_VERSION}}
)
db[CACHE_COLLECTION_RESPONSES].delete_one({"_id": streaming_entry["_id"]})
else:
db[CACHE_COLLECTION_RESPONSES].update_one(
{"_id": streaming_entry["_id"]},
{"$set": {"kind": MERGED, "job_runner_version": JOB_RUNNER_VERSION}},
)
db[CACHE_COLLECTION_RESPONSES].delete_one({"_id": parquet_entry["_id"]})
logging.info("Update the remaning 'split-first-rows-from-streaming' entries to 'split-first-rows'")
db[CACHE_COLLECTION_RESPONSES].update_many(
{"kind": STREAMING}, {"$set": {"kind": MERGED, "job_runner_version": JOB_RUNNER_VERSION}}
)