in jobs/mongodb_migration/src/mongodb_migration/migrations/_20240112164500_cache_add_partial_field_in_split_descriptive_statistics.py [0:0]
def up(self) -> None:
# See https://docs.mongoengine.org/guide/migration.html#example-1-addition-of-a-field
logging.info(
"If missing, add the 'partial' field with the default value None"
" to the cached results of split-descriptive-statistics job runner"
)
db = get_db(CACHE_MONGOENGINE_ALIAS)
partial_configs_entries = db[CACHE_COLLECTION_RESPONSES].find(
{
"kind": "config-parquet",
"content.partial": True,
}
)
partial_splits = {
(entry["dataset"], entry["config"], file["split"])
for entry in partial_configs_entries
for file in entry["content"]["parquet_files"]
if parquet_export_is_partial(file["url"])
}
stats_successful_entries = db[CACHE_COLLECTION_RESPONSES].find(
{
"kind": "split-descriptive-statistics",
"http_status": 200,
"content.partial": {"$exists": False},
}
)
partial_stats_successful_ids = [
entry["_id"]
for entry in stats_successful_entries
if (entry["dataset"], entry["config"], entry["split"]) in partial_splits
]
# set partial: false in all successful entries except for those that are partial
db[CACHE_COLLECTION_RESPONSES].update_many(
{
"_id": {"$nin": partial_stats_successful_ids},
"kind": "split-descriptive-statistics",
"http_status": 200,
"content.partial": {"$exists": False},
},
{
"$set": {
"content.partial": False,
}
},
)
# set partial: true in successful partial entries
db[CACHE_COLLECTION_RESPONSES].update_many(
{
"_id": {"$in": partial_stats_successful_ids},
"kind": "split-descriptive-statistics",
"http_status": 200,
"content.partial": {"$exists": False},
},
{
"$set": {
"content.partial": True,
}
},
)