src/dma/collector/query_managers/mysql.py (59 lines of code) (raw):

# Copyright 2024 Google LLC # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # https://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import annotations from typing import TYPE_CHECKING, Any import aiosql from dma.collector.query_managers.base import CollectionQueryManager from dma.lib.exceptions import ApplicationError from dma.utils import module_to_os_path if TYPE_CHECKING: from aiosql.queries import Queries _root_path = module_to_os_path("dma") class MySQLCollectionQueryManager(CollectionQueryManager): def __init__( self, connection: Any, execution_id: str | None = None, source_id: str | None = None, manual_id: str | None = None, queries: Queries = aiosql.from_path( sql_path=f"{_root_path}/collector/sql/sources/mysql", driver_adapter="asyncmy" ), ) -> None: super().__init__( connection=connection, queries=queries, execution_id=execution_id, source_id=source_id, manual_id=manual_id ) def get_collection_queries(self) -> set[str]: if self.db_version is None: msg = "Database Version was not set. Ensure the initialization step complete successfully." raise ApplicationError(msg) major_version = int(self.db_version[:1]) version_prefix = "base" if major_version > 5.8 else "5" return { f"collection_mysql_{version_prefix}_resource_groups", f"collection_mysql_{version_prefix}_process_list", "collection_mysql_config", "collection_mysql_data_types", "collection_mysql_database_details", "collection_mysql_engines", "collection_mysql_plugins", "collection_mysql_schema_objects", "collection_mysql_table_details", "collection_mysql_users", } def get_collection_filenames(self) -> dict[str, str]: if self.db_version is None: msg = "Database Version was not set. Ensure the initialization step complete successfully." raise ApplicationError(msg) major_version = int(self.db_version[:1]) version_prefix = "base" if major_version > 5.8 else "5" return { f"collection_mysql_{version_prefix}_resource_groups": "mysql_resource_groups", f"collection_mysql_{version_prefix}_process_list": "mysql_process_list", "collection_mysql_config": "mysql_config", "collection_mysql_data_types": "mysql_data_types", "collection_mysql_database_details": "mysql_database_details", "collection_mysql_engines": "mysql_engines", "collection_mysql_plugins": "mysql_plugins", "collection_mysql_schema_objects": "mysql_schema_objects", "collection_mysql_table_details": "mysql_table_details", "collection_mysql_users": "mysql_users", }