stratozone-aws-export.py (442 lines of code) (raw):
"""Copyright 2021 Google LLC.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
version 1.5.1
"""
import argparse
from concurrent.futures import ThreadPoolExecutor, wait
import csv
import datetime
import json
import logging
import os
import sys
import time
import zipfile
import signal
import concurrent.futures.thread
import boto3
from pkg_resources import parse_version as version
import stratozonedict
import aws_resource_scan
# global variables
vm_list = []
vm_tag_list = []
vm_disk_list = []
vm_perf_list = []
region_list = []
run_script = True
start = time.time()
# Initiate the parser
parser = argparse.ArgumentParser()
parser.add_argument(
'-m',
'--collection_mode',
help='Choose if you want to run virtual machine collection or managed services collection.',
choices=['VirtualMachine', 'ManagedService'],
default='VirtualMachine')
parser.add_argument(
'-n',
'--no_perf',
help='Do Not collect performance data.',
action='store_true')
parser.add_argument(
'-t',
'--thread_limit',
help='Number of threads for performance collection.',
type=int,
default=30)
parser.add_argument(
'-p',
'--no_public_ip',
help='Do Not collect Public IP addresses.',
action='store_true')
parser.add_argument(
'-r',
'--resources',
help='Do Not collect deployed resources.',
dest='resources',
action='store',
default='basic')
def handler_stop_signals(signum, frame):
global run_script
run_script = False
print('Exiting application')
sys.exit(0)
def create_directory(dir_name):
"""Create output directory.
Args:
dir_name: Destination directory
"""
try:
if not os.path.exists(dir_name):
os.makedirs(dir_name)
except Exception as e:
logging.error('error in create_directory')
logging.error(e)
def get_image_info(image_id, l_vm_instance):
"""Get source image info.
Args:
image_id: ID of the source image
l_vm_instance: instance dictionary object
Returns:
Dictionary object.
"""
try:
disk_image = client.describe_images(ImageIds=[image_id,]).get('Images')
if len(disk_image) > 0:
l_vm_instance['OsType'] = disk_image[0].get('PlatformDetails')
l_vm_instance['OsPublisher'] = disk_image[0].get('Description')
else:
l_vm_instance['OsType'] = 'unknown'
l_vm_instance['OsPublisher'] = 'unknown'
return l_vm_instance
except Exception as e:
logging.error('error in get_image_info')
logging.error(e)
l_vm_instance['OsType'] = 'unknown'
l_vm_instance['OsPublisher'] = 'unknown'
return l_vm_instance
def get_image_size_details(instance_type, l_vm_instance):
"""Get image size details.
Args:
instance_type: instance type
l_vm_instance: instance dictionary object
Returns:
Dictionary object.
"""
instance_type_info = (
client.describe_instance_types(
InstanceTypes=[instance_type,]).get('InstanceTypes'))
l_vm_instance['MemoryGiB'] = '{:.1f}'.format(
instance_type_info[0]['MemoryInfo']['SizeInMiB']/1024)
l_vm_instance['AllocatedProcessorCoreCount'] = (
instance_type_info[0]['VCpuInfo']['DefaultVCpus'])
return l_vm_instance
def report_writer(dictionary_data, field_name_list, file_name, directory = './output/vm/'):
"""write data contained in dictionary list into csv file.
Args:
dictionary_data: dictionary object
field_name_list: column names
file_name: file name to be created
directory: parent directory
Returns:
Dictionary object.
"""
try:
logging.info('Writing %s to the disk', file_name)
with open(directory + file_name, 'w', newline='') as csvfile:
writer = csv.DictWriter(
csvfile, fieldnames=field_name_list, extrasaction='ignore')
writer.writeheader()
for dictionary_value in dictionary_data:
writer.writerow(dictionary_value)
except Exception as e:
logging.error('error in report_writer')
logging.error(e)
def generate_disk_data(vm_id):
"""If no disk is found generate disk data to prevent import errors.
Args:
vm_id: Instance ID
"""
disk = stratozonedict.vm_disk.copy()
disk['MachineId'] = vm_id
disk['DiskLabel'] = '/dev/xvda'
disk['SizeInGib'] = '52.5'
disk['StorageTypeLabel'] = 'gp2'
vm_disk_list.append(disk)
def get_disk_info(vm_id, block_device_list, root_device_name):
"""Get attached disk data.
Args:
vm_id: Instance ID
block_device_list: list of attached disks
root_device_name: name of the primary (OS) disk
Returns:
Disk create date.
"""
disk_count = 0
try:
disk_create_date = datetime.datetime.now()
for block_device in block_device_list:
disk = stratozonedict.vm_disk.copy()
volume = client.describe_volumes(
VolumeIds=[block_device['Ebs']['VolumeId'],]).get('Volumes')
disk['MachineId'] = vm_id
disk['DiskLabel'] = block_device['DeviceName']
disk['SizeInGib'] = volume[0]['Size']
disk['StorageTypeLabel'] = volume[0]['VolumeType']
vm_disk_list.append(disk)
disk_count = disk_count + 1
if root_device_name == block_device['DeviceName']:
disk_create_date = block_device['Ebs']['AttachTime']
if disk_count == 0:
generate_disk_data(vm_id)
return disk_create_date
except Exception as e:
if disk_count == 0:
generate_disk_data(vm_id)
logging.error('error in get_disk_info')
logging.error(e)
return disk_create_date
def get_network_interface_info(interface_list, l_vm_instance):
"""Get network interface data.
Args:
interface_list: List of network interfaces
l_vm_instance: instance dictionary object
"""
try:
ip_list = []
for nic_count, interface in enumerate(interface_list):
if nic_count == 0:
l_vm_instance['PrimaryIPAddress'] = interface['PrivateIpAddress']
l_vm_instance['PrimaryMACAddress'] = interface['MacAddress']
ip_list.append(interface['PrivateIpAddress'])
if not args.no_public_ip:
if 'Association' in interface:
if len(interface['Association']['PublicIp']) > 0:
l_vm_instance['PublicIPAddress'] = (
interface['Association']['PublicIp'])
ip_list.append(interface['Association']['PublicIp'])
l_vm_instance['IpAddressListSemiColonDelimited'] = (';'.join(ip_list))
except Exception as e:
logging.error('error in get_network_interface_info')
logging.error(e)
def get_instance_tags(vm_id, tag_dictionary, l_vm_instance):
"""Get tags assigned to instance.
Args:
vm_id: Instance ID
tag_dictionary: list of assigned tags
l_vm_instance: instance dictionary object
Returns:
Dictionary object.
"""
try:
# if there is no name tag assigned use instance id as name
l_vm_instance['MachineName'] = vm_id
for tag in tag_dictionary:
tmp_tag = stratozonedict.vm_tag.copy()
tmp_tag['MachineId'] = vm_id
tmp_tag['Key'] = tag['Key']
tmp_tag['Value'] = tag['Value']
if tag['Key'] == 'Name':
l_vm_instance['MachineName'] = tag['Value']
vm_tag_list.append(tmp_tag)
return l_vm_instance
except Exception as e:
logging.error('error in get_instance_tags')
logging.error(e)
return l_vm_instance
def get_metric_data_query(namespace, metric_name,
dimension_name, dimension_value, unit, query_id=''):
"""Get performance metrics JSON query for the VM.
Args:
namespace: Query Namespace
metric_name: Metric name
dimension_name: Dimension name
dimension_value: Dimension value
unit: Unit of measure
query_id: Optional unique ID for the query
Returns:
Formatted JSON query.
"""
if not query_id:
query_id = metric_name.lower()
data_query = {
'Id': query_id,
'MetricStat': {
'Metric': {
'Namespace': namespace,
'MetricName': metric_name,
'Dimensions': [
{
'Name': dimension_name,
'Value': dimension_value
},]
},
'Period': 1800,
'Stat': 'Average',
'Unit': unit
},
'ReturnData': True,
}
return data_query
def get_performance_info(vm_id, region_name, block_device_list):
"""Query system for VM performance data.
Args:
vm_id: instance id.
region_name: name of the AWS region
block_device_list: list of devices (disks) attached to the vm
"""
try:
perf_client = boto3.client('cloudwatch', region_name)
perf_queries = []
global vm_perf_list
disk_count = 0
perf_queries.append(get_metric_data_query('AWS/EC2', 'CPUUtilization',
'InstanceId', vm_id, 'Percent'))
perf_queries.append(get_metric_data_query('AWS/EC2', 'NetworkOut',
'InstanceId', vm_id,
'Bytes'))
perf_queries.append(get_metric_data_query('AWS/EC2', 'NetworkIn',
'InstanceId', vm_id, 'Bytes'))
for block_device in block_device_list:
perf_queries.append(get_metric_data_query('AWS/EBS', 'VolumeReadOps',
'VolumeId',
block_device,
'Count',
'volumereadops'
+ str(disk_count)))
perf_queries.append(get_metric_data_query('AWS/EBS', 'VolumeWriteOps',
'VolumeId',
block_device,
'Count',
'volumewriteops'
+ str(disk_count)))
disk_count = disk_count + 1
response = perf_client.get_metric_data(
MetricDataQueries=perf_queries,
StartTime=datetime.datetime.utcnow() - datetime.timedelta(days=30),
EndTime=datetime.datetime.utcnow(),
ScanBy='TimestampAscending'
)
first_arr_size = len(response['MetricDataResults'][0]['Values'])
if (len(response['MetricDataResults'][1]['Values']) >= first_arr_size and
len(response['MetricDataResults'][2]['Values']) >= first_arr_size and
len(response['MetricDataResults'][3]['Values']) >= first_arr_size):
for i in range(0, first_arr_size):
vm_perf_info = stratozonedict.vm_perf.copy()
vm_perf_info['MachineId'] = vm_id
vm_perf_info['TimeStamp'] = get_formatted_datetime(response['MetricDataResults'][0]['Timestamps'][i])
vm_perf_info['CpuUtilizationPercentage'] = '{:.2f}'.format(
response['MetricDataResults'][0]['Values'][i])
vm_perf_info['NetworkBytesPerSecSent'] = '{:.4f}'.format(
response['MetricDataResults'][1]['Values'][i])
vm_perf_info['NetworkBytesPerSecReceived'] = '{:.4f}'.format(
response['MetricDataResults'][2]['Values'][i])
tmp_read_io = 0
tmp_write_io = 0
for j in range(0, disk_count):
tmp_read_io = tmp_read_io + (
response['MetricDataResults'][3 + j]['Values'][i])
tmp_write_io = tmp_write_io + (
response['MetricDataResults'][4 + j]['Values'][i])
vm_perf_info['DiskReadOperationsPerSec'] = '{:.4f}'.format(
(tmp_read_io /1800))
vm_perf_info['DiskWriteOperationsPerSec'] = '{:.4f}'.format(
(tmp_write_io /1800))
vm_perf_info['AvailableMemoryBytes'] = ''
vm_perf_info['MemoryUtilizationPercentage'] = ''
vm_perf_list.append(vm_perf_info)
except Exception as e:
logging.error('error in get_performance_info')
logging.error(e)
def display_script_progress():
"""Display collection progress."""
try:
sys.stdout.write('\r')
sys.stdout.write('%s[%s%s] %i/%i\r' % ('Regions: ', '#'*region_counter,
'.'*(total_regions-region_counter),
region_counter, total_regions))
sys.stdout.flush()
except Exception as e:
logging.error('error in display_script_progress')
logging.error(e)
def region_is_available(l_region):
"""Check if region is enabled.
Args:
l_region: name of the region
Returns:
true/false
"""
regional_sts = boto3.client('sts', l_region)
try:
regional_sts.get_caller_identity()
return True
except Exception as e:
logging.error('error in region_is_available')
logging.error(e)
return False
def zip_files(dir_name, zip_file_name):
"""Compress generated files into zip file for import into stratozone.
Args:
dir_name: source directory
zip_file_name: name of the file to be created
"""
csv_filter = lambda name: 'csv' in name or 'json' in name
if os.path.exists(zip_file_name):
os.remove(zip_file_name)
with zipfile.ZipFile(zip_file_name, 'w') as zip_obj:
# Iterate over all the files in directory
for folder_name, sub_folder, file_names in os.walk(dir_name):
for file_name in file_names:
if csv_filter(file_name):
file_path = os.path.join(folder_name, file_name)
zip_obj.write(file_path, os.path.basename(file_path))
def get_formatted_datetime(dt):
"""Converts datetime to a standard format
Args:
dt: datetime value
"""
if (dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None):
# If timezone info is not available, assume that its in UTC
return dt.strftime('%Y-%m-%d %H:%M:%S+00:00')
else:
dt_str = dt.strftime('%Y-%m-%d %H:%M:%S%z').replace('Z', '+00:00')
dt_str = "{0}:{1}".format(dt_str[:-2],dt_str[-2:])
return dt_str
###########################################################################
# Collect information about deployed instances
###########################################################################
signal.signal(signal.SIGINT, handler_stop_signals)
signal.signal(signal.SIGTERM, handler_stop_signals)
# Read arguments from the command line
args = parser.parse_args()
if version(boto3.__version__) < version('1.20.20'):
print('You are using version of AWS Python SDK that is too old.'
'\nVersion installed: {}'
'\nPlease upgrade to the latest version.'
'\nhttps://boto3.amazonaws.com/v1/documentation/api/'
'latest/guide/quickstart.html'.format(boto3.__version__))
exit()
while run_script:
# create output and log directory
create_directory('./output/vm')
create_directory('./output/services')
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(filename='./output/stratozone-aws-export.log',
format=log_format,
level=logging.ERROR)
logging.debug('Starting collection at: %s', datetime.datetime.now())
ec2_client = boto3.client('ec2')
logging.info('Get all regions')
regions = ec2_client.describe_regions(AllRegions=True)
region_list = list(map(lambda x:x['RegionName'], regions['Regions']))
if args.collection_mode == 'VirtualMachine':
logging.info('Get Organization ID')
region_counter = 0
total_regions = len(regions['Regions'])
# loop through all the regions and for each region get a list of deployed VMs
# process each VM retrieving all basic data as well as performance metrics.
for region in regions['Regions']:
region_counter += 1
if not region_is_available(region['RegionName']):
continue
region_list.append(region['RegionName'])
client = boto3.client('ec2', region['RegionName'])
display_script_progress()
specific_instance = client.describe_instances()
for reservation in specific_instance['Reservations']:
for instance in reservation['Instances']:
if instance.get('State').get('Name') == 'terminated':
continue
vm_instance = stratozonedict.vm_basic_info.copy()
vm_instance['MachineId'] = instance.get('InstanceId')
vm_instance['HostingLocation'] = region.get('RegionName')
vm_instance['MachineTypeLabel'] = instance.get('InstanceType')
vm_instance['MachineStatus'] = instance.get('State').get('Name')
vm_instance = get_image_info(instance.get('ImageId'), vm_instance)
if vm_instance['OsType'] == 'unknown':
tmp_os_value = 'Linux'
if ('windows' in instance.get('PlatformDetails').lower() or
'sql' in instance.get('PlatformDetails').lower()):
tmp_os_value = 'Windows'
vm_instance['OsType'] = tmp_os_value
vm_instance['OsPublisher'] = tmp_os_value
vm_instance = get_image_size_details(instance.get('InstanceType'),
vm_instance)
if 'Tags' in instance:
vm_instance = get_instance_tags(instance.get('InstanceId'),
instance['Tags'],
vm_instance)
else:
vm_instance['MachineName'] = vm_instance['MachineId']
if 'NetworkInterfaces' in instance:
get_network_interface_info(instance['NetworkInterfaces'],
vm_instance)
disk_id_list = []
for tt in instance['BlockDeviceMappings']:
disk_id_list.append(tt['Ebs']['VolumeId'])
vm_create_timestamp = get_disk_info(instance['InstanceId'],
instance['BlockDeviceMappings'],
instance['RootDeviceName'])
vm_instance['CreateDate'] = get_formatted_datetime(vm_create_timestamp)
vm_instance['DiskIDs'] = disk_id_list
vm_list.append(vm_instance)
if not args.no_perf:
processes = []
print('Inventory collection completed.'
' Collecting performance using {} threads'.format(args.thread_limit))
with ThreadPoolExecutor(max_workers=args.thread_limit) as executor:
try:
for cvm in vm_list:
processes.append(executor.submit(get_performance_info,
cvm['MachineId'],
cvm['HostingLocation'],
cvm['DiskIDs']))
except KeyboardInterrupt:
executor._threads.clear()
concurrent.futures.thread._threads_queues.clear()
sys.exit()
raise
wait(processes)
# write collected data to files
created_files = 4
field_names = ['MachineId', 'MachineName',
'PrimaryIPAddress', 'PrimaryMACAddress',
'PublicIPAddress', 'IpAddressListSemiColonDelimited',
'TotalDiskAllocatedGiB', 'TotalDiskUsedGiB',
'MachineTypeLabel', 'AllocatedProcessorCoreCount',
'MemoryGiB', 'HostingLocation', 'OsType',
'OsPublisher', 'OsName', 'OsVersion', 'MachineStatus',
'ProvisioningState', 'CreateDate', 'IsPhysical', 'Source']
report_writer(vm_list, field_names, 'vmInfo.csv')
if vm_tag_list:
field_names = ['MachineId', 'Key', 'Value']
report_writer(vm_tag_list, field_names, 'tagInfo.csv')
field_names = ['MachineId', 'DiskLabel', 'SizeInGib', 'UsedInGib',
'StorageTypeLabel']
report_writer(vm_disk_list, field_names, 'diskInfo.csv')
field_names = ['MachineId', 'TimeStamp', 'CpuUtilizationPercentage',
'MemoryUtilizationPercentage','AvailableMemoryBytes',
'DiskReadOperationsPerSec', 'DiskWriteOperationsPerSec',
'NetworkBytesPerSecSent', 'NetworkBytesPerSecReceived']
if not args.no_perf:
report_writer(vm_perf_list, field_names, 'perfInfo.csv')
else:
created_files = 3
zip_files('./output/', 'vm-aws-import-files.zip')
logging.debug('Collection completed at: %s', datetime.datetime.now())
print('\nExport Completed. \n')
print('vm-aws-import-files.zip generated successfully containing {} files.'
.format(created_files))
if args.no_perf:
print('Performance data was not collected.')
if args.no_public_ip:
print('Public IP address data was not collected.')
break
elif args.collection_mode == 'ManagedService':
start_time = datetime.datetime.now()
created_files = 0
if args.resources != 'none':
aws_resource_scan.scan_aws(args.resources, region_list, args.thread_limit)
created_files = created_files + 1
else:
print('Skipping resource collection. ')
end_time = datetime.datetime.now()
time_delta = end_time - start_time
logging.info('Completing resource collection.')
logging.info(time_delta)
if os.path.isfile('db_secrets.json'):
with open('db_secrets.json', 'r') as f:
data = json.load(f)
globals()['scanner'] = __import__('db.rds_scanner')
scanner = globals()['scanner'].rds_scanner.RdsScanner()
for region in data:
for secret in region['secrets']:
if scanner.scan(secret, region['region']):
created_files += 1
else:
print('Skipping database collection.')
zip_files('./output/services/', 'services-aws-import-files.zip')
logging.debug('Collection completed at: %s', datetime.datetime.now())
print('\nExport Completed. \n')
print('services-aws-import-files.zip generated successfully containing {} files.'
.format(created_files))
break