in tools/vm-migrator/src/migrator/subnet_region_migrator.py [0:0]
def bulk_create_instances(file_name, target_project, target_service_account,
target_scopes, target_subnet_uri: uri.Subnet,
source_project, retain_ip) -> bool:
target_network = subnet.get_network(target_subnet_uri)
result = True
disk_labels = {}
with open(file_name, 'r') as read_obj:
csv_dict_reader = DictReader(read_obj)
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(
max_workers=100) as executor:
instance_future = []
count = 0
# Start the load operations and mark each future with its URL
for row in csv_dict_reader:
ip = None
if retain_ip:
ip = row['internal_ip']
if target_network:
row['network'] = target_network
source_instance_uri = uri.Instance.from_uri(row['self_link'])
target_instance_uri = uri.Instance(
project=target_project,
zone=zone_mapping.FIND[source_instance_uri.zone],
name=row['name']
)
alias_ip_ranges = []
# Re create the alias ip object from CSV if any
# This support upto 4 ip ranges but they can be easily extended
for i in range(4):
alias_range = {}
if row['range_name_' + str(i + 1)] != '':
alias_range['subnetworkRangeName'] = row['range_name_'
+ str(i + 1)]
if row['alias_ip_name_' + str(i + 1)]:
alias_range['aliasIpName'] = row['alias_ip_name_' +
str(i + 1)]
if row['alias_ip_' + str(i + 1)]:
alias_range['ipCidrRange'] = row['alias_ip_' +
str(i + 1)]
alias_ip_ranges.append(alias_range)
# This supports up to 9 disks
disk_names = {}
for i in range(9):
if row['device_name_' + str(i + 1)] != '':
disk_names[row['device_name_' +
str(i + 1)]] = row['disk_name_' +
str(i + 1)]
if row['disk_labels_' + str(i + 1)] != '':
disk_labels[uri.Disk(target_instance_uri.project,
target_instance_uri.zone,
row['disk_name_' + str(i + 1)]
).uri] = row['disk_labels_' +
str(i + 1)]
node_group = None
if row['node_group'] and row['node_group'] != '':
node_group = row['node_group']
source_machine_type_uri = uri.MachineType.from_uri(
row['machine_type'])
target_machine_type_uri = uri.MachineType(
project=target_project,
machine_type=machine_type_mapping.FIND.get(
source_machine_type_uri.machine_type,
source_machine_type_uri.machine_type),
zone=target_instance_uri.zone
)
if target_subnet_uri.region != target_instance_uri.region:
logging.error(
'Instance zone mapping from %s to %s is outside the '
'target region %s', source_instance_uri.zone,
target_instance_uri.zone, target_subnet_uri.zone)
continue
instance_future.append(
executor.submit(instance.create, target_instance_uri,
row['network'], target_subnet_uri,
alias_ip_ranges, node_group, disk_names,
ip, target_machine_type_uri,
source_project, target_service_account,
target_scopes))
count = count + 1
tracker = 0
for future in concurrent.futures.as_completed(instance_future):
try:
instance_name = future.result()
tracker = tracker + 1
logging.info('%r instance %i out of %i created '
'sucessfully', instance_name, tracker, count)
except Exception as exc:
logging.error(
'Instance creation generated an exception: %s', exc)
result = False
if not result:
return False
disk_future = []
for disk_uri, labels in disk_labels.items():
disk_future.append(
executor.submit(disk.setLabels, uri.Disk.from_uri(disk_uri),
json.loads(labels)))
tracker = 0
for future in concurrent.futures.as_completed(disk_future):
try:
disk_name = future.result()
tracker = tracker + 1
logging.info('%r disk %i out of %i updated with labels '
'sucessfully', disk_name, tracker,
len(disk_labels))
except Exception as exc:
logging.error(
'Disk update generated an exception: %s', exc)
result = False
return result