in db/rds_scanner.py [0:0]
def scan(self, secret_config, secret_region):
"""Connects to RDS database and collects data.
Args:
secret_name: Name of the RDS database credential secret key
secret_region: Name of the RDS database credential secret region
Returns:
True if collection is successful
False otherwise
"""
try:
secrets_manager = SecretsManager(secret_region)
secret = json.loads(secrets_manager.get_secret(secret_config['name']))
rds_info = DBConnection()
rds_info.username = secret_config.get('username', secret['username'])
rds_info.password = secret['password']
rds_info.engine = secret_config.get('engine', secret.get('engine'))
rds_info.host = secret_config.get('host', secret.get('host'))
rds_info.port = secret_config.get('port', secret.get('port'))
rds_info.dbname = secret_config.get('dbname', secret.get('dbname'))
default_db = {
'sqlserver': 'master', 'mysql': 'mysql', 'postgres': 'postgres'
}
if not rds_info.dbname:
rds_info.dbname = default_db.get(rds_info.engine)
session = boto3.session.Session()
client = session.client(service_name='rds', region_name=secret_region)
db_instance_helper = DBInstanceHelper()
db_instance_identifier = secret_config.get('dbInstanceIdentifier', secret.get('dbInstanceIdentifier'))
instance_details = db_instance_helper.get_image_size_details(
client, db_instance_identifier, secret_region)
output = {
'instanceDetails': instance_details,
'hostName': rds_info.host,
'databaseName': rds_info.dbname,
'port': rds_info.port
}
if rds_info.engine == 'mysql':
output['dbType'] = 4
scanner = MysqlScanner(secret_region)
elif rds_info.engine == 'postgres':
output['dbType'] = 1
scanner = PostgreSQLScanner(secret_region)
elif rds_info.engine == 'sqlserver':
output['dbType'] = 2
scanner = SqlServerScanner(secret_region)
if not scanner.scan(rds_info, output):
return False
json_object = json.dumps(output, default=str)
file_hash = hashlib.md5((rds_info.host + str(rds_info.port) +
rds_info.dbname).encode('utf-8')).hexdigest()
if not os.path.exists('./output/services/'):
os.makedirs('./output/services/')
with open('./output/services/db_' + file_hash + '.json', 'w+',
encoding='utf-8') as outfile:
outfile.write(json_object)
return True
except Exception as e: # pylint: disable=broad-except
logging.error('Received an unexpected error')
logging.error(e)
return False