in jobs/mongodb_migration/src/mongodb_migration/migrations/_20240221103200_cache_merge_config_split_names.py [0:0]
def up(self) -> None:
db = get_db(CACHE_MONGOENGINE_ALIAS)
logging.info(
"Remove all the entries with 'error_code=ResponseAlreadyComputedError' for 'config-split-names-from-streaming'"
)
db[CACHE_COLLECTION_RESPONSES].delete_many(
{
"kind": {"$in": [STREAMING, INFO]},
"error_code": "ResponseAlreadyComputedError",
}
)
logging.info("Update or delete all the 'config-split-names-from-info' responses")
for info_entry in db[CACHE_COLLECTION_RESPONSES].find({"kind": INFO}):
streaming_entry = db[CACHE_COLLECTION_RESPONSES].find_one(
{
"kind": STREAMING,
"dataset": info_entry["dataset"],
"config": info_entry["config"],
}
)
if streaming_entry is None:
db[CACHE_COLLECTION_RESPONSES].update_one(
{"_id": info_entry["_id"]}, {"$set": {"kind": MERGED, "job_runner_version": JOB_RUNNER_VERSION}}
)
elif info_entry["http_status"] == 200:
db[CACHE_COLLECTION_RESPONSES].update_one(
{"_id": info_entry["_id"]}, {"$set": {"kind": MERGED, "job_runner_version": JOB_RUNNER_VERSION}}
)
db[CACHE_COLLECTION_RESPONSES].delete_one({"_id": streaming_entry["_id"]})
else:
db[CACHE_COLLECTION_RESPONSES].update_one(
{"_id": streaming_entry["_id"]},
{"$set": {"kind": MERGED, "job_runner_version": JOB_RUNNER_VERSION}},
)
db[CACHE_COLLECTION_RESPONSES].delete_one({"_id": info_entry["_id"]})
logging.info("Update the remaning 'config-split-names-from-streaming' entries to 'config-split-names'")
db[CACHE_COLLECTION_RESPONSES].update_many(
{"kind": STREAMING}, {"$set": {"kind": MERGED, "job_runner_version": JOB_RUNNER_VERSION}}
)