def run_job()

in jobs/cache_maintenance/src/cache_maintenance/main.py [0:0]


def run_job() -> None:
    job_config = JobConfig.from_env()
    action = job_config.action
    #  In the future we will support other kind of actions
    if not action:
        logging.warning("No action mode was selected, skipping tasks.")
        return

    init_logging(level=job_config.log.level)
    with (
        CacheMongoResource(
            database=job_config.cache.mongo_database, host=job_config.cache.mongo_url
        ) as cache_resource,
        QueueMongoResource(
            database=job_config.queue.mongo_database, host=job_config.queue.mongo_url
        ) as queue_resource,
    ):
        start_time = datetime.now()
        if action in ("backfill", "backfill-retryable-errors"):
            if not cache_resource.is_available():
                logging.warning(
                    "The connection to the cache database could not be established. The action is skipped."
                )
                return
            if not queue_resource.is_available():
                logging.warning(
                    "The connection to the queue database could not be established. The action is skipped."
                )
                return
            cached_assets_storage_client = StorageClient(
                protocol=job_config.cached_assets.storage_protocol,
                storage_root=job_config.cached_assets.storage_root,
                base_url=job_config.cached_assets.base_url,
                s3_config=job_config.s3,
                # no need to specify a url_signer
            )
            assets_storage_client = StorageClient(
                protocol=job_config.assets.storage_protocol,
                storage_root=job_config.assets.storage_root,
                base_url=job_config.assets.base_url,
                s3_config=job_config.s3,
                # no need to specify a url_signer
            )
            if action == "backfill":
                backfill_all_datasets(
                    hf_endpoint=job_config.common.hf_endpoint,
                    hf_token=job_config.common.hf_token,
                    blocked_datasets=job_config.common.blocked_datasets,
                    storage_clients=[cached_assets_storage_client, assets_storage_client],
                )
            else:
                backfill_retryable_errors(
                    hf_endpoint=job_config.common.hf_endpoint,
                    hf_token=job_config.common.hf_token,
                    blocked_datasets=job_config.common.blocked_datasets,
                    storage_clients=[cached_assets_storage_client, assets_storage_client],
                )
        elif action == "collect-queue-metrics":
            if not queue_resource.is_available():
                logging.warning(
                    "The connection to the queue database could not be established. The action is skipped."
                )
                return
            collect_queue_metrics()
            collect_worker_size_jobs_count()
        elif action == "collect-cache-metrics":
            if not cache_resource.is_available():
                logging.warning(
                    "The connection to the cache database could not be established. The action is skipped."
                )
                return
            collect_cache_metrics()
        elif action == "post-messages":
            if not cache_resource.is_available():
                logging.warning(
                    "The connection to the cache database could not be established. The action is skipped."
                )
                return
            post_messages(
                hf_endpoint=job_config.common.hf_endpoint,
                bot_associated_user_name=job_config.discussions.bot_associated_user_name,
                bot_token=job_config.discussions.bot_token,
                parquet_revision=job_config.discussions.parquet_revision,
            )
        elif action == "skip":
            pass
        else:
            logging.warning(f"Action '{action}' is not supported.")

        end_time = datetime.now()
        logging.info(f"Duration: {end_time - start_time}")