db/mysql_scanner.py (82 lines of code) (raw):
"""Copyright 2021 Google LLC.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
version 1.5.1
"""
import logging
from db.db_info import Query
import mysql.connector
class MysqlScanner:
"""Helper class to scan MySQL database."""
def __init__(self, region):
self.major_version = 0
self.minor_version = 0
self.region = region
def set_version(self, version):
version_parts = version.split(".")
if len(version_parts) >= 2:
self.major_version = int(version_parts[0])
self.minor_version = int(version_parts[1])
def cannot_process_query(self, query_type):
"""Checks whether the given query_type can be executed for the given version of MySQL.
Args:
query_type: query type enum value
Returns:
True if the query can be executed
False otherwise
"""
if (query_type == "MySQL_UsersWithEmptyPasswords5_6" and
(self.major_version > 5 or
(self.major_version == 5 and self.minor_version > 6))):
return True
if (query_type == "MySQL_UsersWithEmptyPasswords5_7" and
(self.major_version < 5 or
(self.major_version == 5 and self.minor_version < 7))):
return True
return False
def scan(self, rds_info, output):
"""Connects to MySQL database and collects data.
Args:
rds_info: Dictionary object with database connection information
output: Dictionary object to store the collected data
Returns:
True if collection is successful
False otherwise
"""
queries = self.get_queries()
collection = {}
try:
conn = mysql.connector.connect(
host=rds_info.host,
user=rds_info.username,
passwd=rds_info.password,
port=rds_info.port,
database=rds_info.dbname)
for query in queries:
try:
if self.cannot_process_query(query.query_type):
continue
cur = conn.cursor()
cur.execute(query.query)
row_headers = [x[0] for x in cur.description]
query_results = cur.fetchall()
if query.query_type == "MySQL_Version":
version = query_results[0][0]
self.set_version(version)
collection["version()"] = version
output["version"] = version
else:
result_array = []
for result in query_results:
result_array.append(dict(zip(row_headers, result)))
collection[query.query_type] = result_array
except Exception as ex: # pylint: disable=broad-except
if query.query_type == "MySQL_Version":
raise ex
logging.error("Failed to run %s", query.query_type)
logging.error(ex)
output["MySQL"] = collection
return True
except Exception as e: # pylint: disable=broad-except
logging.error("Received an unexpected error")
logging.error(e)
return False
def get_queries(self):
"""Gets a list of data collection queries.
Returns:
List of data collection queries
"""
version_query = "select version() as version"
return [
Query("MySQL_Version", version_query),
Query("MySQL_VersionComment", """
select @@version_comment
"""),
Query("MySQL_DataDir", """
select @@datadir
"""),
Query("MySQL_Plugins", """
SHOW PLUGINS
"""),
Query(
"MySQL_SizeByStorageEngine", """
select /*+ MAX_EXECUTION_TIME(5000) */ ENGINE AS Storage_Engine, COUNT(*) Tables_Count,
ROUND(SUM(data_length) / (1024*1024*1024),2) Data_Size,
ROUND(SUM(index_length)/ (1024*1024*1024),2) Index_Size
FROM information_schema.TABLES
WHERE ENGINE IS NOT NULL
AND table_schema NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys')
GROUP BY ENGINE
"""),
Query(
"MySQL_TablesWithNoPK", """
select /*+ MAX_EXECUTION_TIME(5000) */ tables.table_schema, tables.table_name, tables.table_rows
FROM information_schema.tables
LEFT JOIN (
SELECT table_schema, table_name
FROM information_schema.statistics
GROUP BY table_schema, table_name, index_name
HAVING SUM(CASE WHEN non_unique = 0 AND nullable != 'YES' THEN 1 ELSE 0 END) = COUNT(*)
) puks ON tables.table_schema = puks.table_schema AND tables.table_name = puks.table_name
WHERE puks.table_name IS NULL
AND tables.table_schema NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys')
AND tables.table_type = 'BASE TABLE'
"""),
Query(
"MySQL_GlobalVariables", """
SHOW GLOBAL STATUS WHERE VARIABLE_NAME IN ('THREADS_CONNECTED','THREADS_RUNNING', 'QUERIES')
"""),
]