in jobs/mongodb_migration/src/mongodb_migration/migrations/_20221117223000_cache_generic_response.py [0:0]
def up(self) -> None:
# See https://docs.mongoengine.org/guide/migration.html#example-1-addition-of-a-field
logging.info(
f"Create the {cachedResponseCollection} collection, and fill it with the data from splits and first-rows"
)
db = get_db(db_name)
# Copy the data from the previous collections (splitsResponse, firstRowsResponse) to
# the new generic collection (cachedResponse)
with contextlib.suppress(InvalidName):
for splits_response in db[splitsResponseCollection].find():
if not isinstance(splits_response, dict):
# for mypy
raise ValueError("splits_response should be a dict")
db[cachedResponseCollection].insert_one(
{
"_id": splits_response.get("_id"),
"kind": SPLITS_KIND,
# ^ "kind" is a new field
"dataset": splits_response.get("dataset_name"),
"config": None,
"split": None,
# ^ "config" and "split" are None for kind=/splits
"http_status": splits_response.get("http_status"),
"error_code": splits_response.get("error_code"),
"content": splits_response.get("response"),
# ^ "response" field has been renamed to "content"
"worker_version": splits_response.get("worker_version"),
"dataset_git_revision": splits_response.get("dataset_git_revision"),
"details": splits_response.get("details"),
"updated_at": splits_response.get("updated_at"),
# "stale" field is not used anymore
}
)
with contextlib.suppress(InvalidName):
for first_rows_response in db[firstRowsResponseCollection].find():
if not isinstance(first_rows_response, dict):
# for mypy
raise ValueError("first_rows_response should be a dict")
db[cachedResponseCollection].insert_one(
{
"_id": first_rows_response.get("_id"),
"kind": FIRST_ROWS_KIND,
# ^ "kind" is a new field
"dataset": first_rows_response.get("dataset_name"),
"config": first_rows_response.get("config_name"),
"split": first_rows_response.get("split_name"),
# ^ "config" and "split" are None for kind=/splits
"http_status": first_rows_response.get("http_status"),
"error_code": first_rows_response.get("error_code"),
"content": first_rows_response.get("response"),
# ^ "response" field has been renamed to "content"
"worker_version": first_rows_response.get("worker_version"),
"dataset_git_revision": first_rows_response.get("dataset_git_revision"),
"details": first_rows_response.get("details"),
"updated_at": first_rows_response.get("updated_at"),
# "stale" field is not used anymore
}
)