in db/mysql_scanner.py [0:0]
def scan(self, rds_info, output):
"""Connects to MySQL database and collects data.
Args:
rds_info: Dictionary object with database connection information
output: Dictionary object to store the collected data
Returns:
True if collection is successful
False otherwise
"""
queries = self.get_queries()
collection = {}
try:
conn = mysql.connector.connect(
host=rds_info.host,
user=rds_info.username,
passwd=rds_info.password,
port=rds_info.port,
database=rds_info.dbname)
for query in queries:
try:
if self.cannot_process_query(query.query_type):
continue
cur = conn.cursor()
cur.execute(query.query)
row_headers = [x[0] for x in cur.description]
query_results = cur.fetchall()
if query.query_type == "MySQL_Version":
version = query_results[0][0]
self.set_version(version)
collection["version()"] = version
output["version"] = version
else:
result_array = []
for result in query_results:
result_array.append(dict(zip(row_headers, result)))
collection[query.query_type] = result_array
except Exception as ex: # pylint: disable=broad-except
if query.query_type == "MySQL_Version":
raise ex
logging.error("Failed to run %s", query.query_type)
logging.error(ex)
output["MySQL"] = collection
return True
except Exception as e: # pylint: disable=broad-except
logging.error("Received an unexpected error")
logging.error(e)
return False