jobs/mongodb_migration/src/mongodb_migration/deletion_migrations.py (155 lines of code) (raw):
# SPDX-License-Identifier: Apache-2.0
# Copyright 2023 The HuggingFace Authors.
import logging
from collections.abc import Mapping
from typing import Any, Optional
from libcommon.queue.jobs import JobDocument
from libcommon.simple_cache import CachedResponseDocument
from mongoengine.connection import get_db
from mongodb_migration.check import check_documents
from mongodb_migration.migration import (
BaseCacheMigration,
BaseQueueMigration,
CacheMigration,
IrreversibleMigrationError,
Migration,
QueueMigration,
)
class MetricsDeletionMigration(Migration):
def __init__(self, job_type: str, cache_kind: str, version: str, description: Optional[str] = None):
if not description:
description = f"[deprecated] no-op migration for job type '{job_type}' and cache kind '{cache_kind}'"
super().__init__(version=version, description=description)
def up(self) -> None:
logging.info("[deprecated] no-op")
def down(self) -> None:
logging.info("[deprecated] no-op")
def validate(self) -> None:
logging.info("[deprecated] no-op")
class CacheDeletionMigration(CacheMigration):
def __init__(self, cache_kind: str, version: str, description: Optional[str] = None):
if not description:
description = f"delete the cache entries of kind '{cache_kind}'"
super().__init__(cache_kind=cache_kind, version=version, description=description)
def up(self) -> None:
logging.info(f"Delete cache entries of kind {self.cache_kind}")
db = get_db(self.MONGOENGINE_ALIAS)
# delete existing documents
result = db[self.COLLECTION_RESPONSES].delete_many({"kind": self.cache_kind})
logging.info(f"{result.deleted_count} deleted cache entries")
def down(self) -> None:
raise IrreversibleMigrationError("This migration does not support rollback")
def validate(self) -> None:
logging.info(f"Check that none of the documents has the {self.cache_kind} kind")
db = get_db(self.MONGOENGINE_ALIAS)
if db[self.COLLECTION_RESPONSES].count_documents({"kind": self.cache_kind}):
raise ValueError(f"Found documents with kind {self.cache_kind}")
class QueueDeletionMigration(QueueMigration):
def __init__(self, job_type: str, version: str, description: Optional[str] = None):
if not description:
description = f"delete the jobs of type '{job_type}'"
super().__init__(job_type=job_type, version=version, description=description)
def up(self) -> None:
logging.info(f"Delete jobs of type {self.job_type}")
db = get_db(self.MONGOENGINE_ALIAS)
result = db[self.COLLECTION_JOBS].delete_many({"type": self.job_type})
logging.info(f"{result.deleted_count} deleted jobs")
def down(self) -> None:
raise IrreversibleMigrationError("This migration does not support rollback")
def validate(self) -> None:
logging.info(f"Check that none of the documents has the {self.job_type} type")
db = get_db(self.MONGOENGINE_ALIAS)
if db[self.COLLECTION_JOBS].count_documents({"type": self.job_type}):
raise ValueError(f"Found documents with type {self.job_type}")
def get_index_names(index_information: Mapping[str, Any], field_name: str) -> list[str]:
return [
name
for name, value in index_information.items()
if isinstance(value, dict)
and "expireAfterSeconds" in value
and "key" in value
and value["key"] == [(field_name, 1)]
]
class MigrationQueueDeleteTTLIndex(BaseQueueMigration):
def __init__(self, version: str, description: str, field_name: str):
super().__init__(version=version, description=description)
self.field_name = field_name
def up(self) -> None:
logging.info(
f"Delete ttl index on field {self.field_name}. Mongoengine will create it again with a different TTL"
" parameter"
)
db = get_db(self.MONGOENGINE_ALIAS)
collection = db[self.COLLECTION_JOBS]
ttl_index_names = get_index_names(index_information=collection.index_information(), field_name=self.field_name)
if len(ttl_index_names) != 1:
raise ValueError(f"Expected 1 ttl index on field {self.field_name}, found {len(ttl_index_names)}")
collection.drop_index(ttl_index_names[0])
def down(self) -> None:
raise IrreversibleMigrationError("This migration does not support rollback")
def validate(self) -> None:
logging.info("Check that the index does not exists anymore")
db = get_db(self.MONGOENGINE_ALIAS)
collection = db[self.COLLECTION_JOBS]
ttl_index_names = get_index_names(index_information=collection.index_information(), field_name=self.field_name)
if len(ttl_index_names) > 0:
raise ValueError(f"Found TTL index for field {self.field_name}")
class MigrationDeleteIndex(Migration):
def __init__(self, version: str, description: str, database: str, collection: str, index_name: str):
super().__init__(version=version, description=description)
self.database = database
self.collection = collection
self.index_name = index_name
def up(self) -> None:
logging.info(f"Delete ttl index {self.index_name}.")
db = get_db(self.database)
collection = db[self.collection]
collection.drop_index(self.index_name)
def down(self) -> None:
raise IrreversibleMigrationError("This migration does not support rollback")
def validate(self) -> None:
logging.info("Check that the index does not exists anymore")
db = get_db(self.database)
collection = db[self.collection]
if self.index_name in collection.index_information():
raise ValueError(f"Index still exists: {self.index_name}")
class MigrationDeleteJobsByStatus(BaseQueueMigration):
def __init__(self, status_list: list[str], version: str, description: str):
super().__init__(version=version, description=description)
self.status_list = status_list
def up(self) -> None:
logging.info(f"Delete jobs with status {self.status_list}.")
db = get_db(self.MONGOENGINE_ALIAS)
db[self.COLLECTION_JOBS].delete_many({"status": {"$in": self.status_list}})
def down(self) -> None:
raise IrreversibleMigrationError("This migration does not support rollback")
def validate(self) -> None:
logging.info("Check that jobs with status list dont exist")
db = get_db(self.MONGOENGINE_ALIAS)
if db[self.COLLECTION_JOBS].count_documents({"status": {"$in": self.status_list}}):
raise ValueError(f"Found documents with status in {self.status_list}")
class MigrationRemoveFieldFromJob(BaseQueueMigration):
def __init__(self, field_name: str, version: str, description: str):
super().__init__(version=version, description=description)
self.field_name = field_name
def up(self) -> None:
logging.info(f"Removing '{self.field_name}' field.")
db = get_db(self.MONGOENGINE_ALIAS)
db[self.COLLECTION_JOBS].update_many({}, {"$unset": {self.field_name: ""}})
def down(self) -> None:
raise IrreversibleMigrationError("This migration does not support rollback")
def validate(self) -> None:
logging.info(f"Ensure that a random selection of jobs don't have '{self.field_name}' field")
check_documents(DocCls=JobDocument, sample_size=10)
class MigrationRemoveFieldFromCache(BaseCacheMigration):
def __init__(self, field_name: str, version: str, description: Optional[str] = None):
if not description:
description = f"remove '{field_name}' field from cache"
super().__init__(version=version, description=description)
self.field_name = field_name
def up(self) -> None:
logging.info(f"Removing '{self.field_name}' field.")
db = get_db(self.MONGOENGINE_ALIAS)
db[self.COLLECTION_RESPONSES].update_many({}, {"$unset": {self.field_name: ""}})
def down(self) -> None:
raise IrreversibleMigrationError("This migration does not support rollback")
def validate(self) -> None:
logging.info(f"Ensure that a random selection of documents don't have '{self.field_name}' field")
check_documents(DocCls=CachedResponseDocument, sample_size=10)