def up()

in jobs/mongodb_migration/src/mongodb_migration/migrations/_20240221160700_cache_merge_split_first_rows.py [0:0]


    def up(self) -> None:
        db = get_db(CACHE_MONGOENGINE_ALIAS)
        logging.info(
            "Remove all the entries with 'error_code=ResponseAlreadyComputedError' for 'split-first-rows-from-streaming'"
        )
        db[CACHE_COLLECTION_RESPONSES].delete_many(
            {
                "kind": {"$in": [STREAMING, PARQUET]},
                "error_code": "ResponseAlreadyComputedError",
            }
        )
        logging.info("Update or delete all the 'split-first-rows-from-parquet' responses")
        for parquet_entry in db[CACHE_COLLECTION_RESPONSES].find({"kind": PARQUET}):
            streaming_entry = db[CACHE_COLLECTION_RESPONSES].find_one(
                {
                    "kind": STREAMING,
                    "dataset": parquet_entry["dataset"],
                    "config": parquet_entry["config"],
                    "split": parquet_entry["split"],
                }
            )
            if streaming_entry is None:
                db[CACHE_COLLECTION_RESPONSES].update_one(
                    {"_id": parquet_entry["_id"]}, {"$set": {"kind": MERGED, "job_runner_version": JOB_RUNNER_VERSION}}
                )
            elif parquet_entry["http_status"] == 200:
                db[CACHE_COLLECTION_RESPONSES].update_one(
                    {"_id": parquet_entry["_id"]}, {"$set": {"kind": MERGED, "job_runner_version": JOB_RUNNER_VERSION}}
                )
                db[CACHE_COLLECTION_RESPONSES].delete_one({"_id": streaming_entry["_id"]})
            else:
                db[CACHE_COLLECTION_RESPONSES].update_one(
                    {"_id": streaming_entry["_id"]},
                    {"$set": {"kind": MERGED, "job_runner_version": JOB_RUNNER_VERSION}},
                )
                db[CACHE_COLLECTION_RESPONSES].delete_one({"_id": parquet_entry["_id"]})
        logging.info("Update the remaning 'split-first-rows-from-streaming' entries to 'split-first-rows'")
        db[CACHE_COLLECTION_RESPONSES].update_many(
            {"kind": STREAMING}, {"$set": {"kind": MERGED, "job_runner_version": JOB_RUNNER_VERSION}}
        )