in source/code/actions/ec2_replace_instance_action.py [0:0]
def _create_replacement_instance(self):
def copy_volume_tags():
source_volume_ids = [m["Ebs"]["VolumeId"] for m in self.instance["BlockDeviceMappings"]]
with Timer(timeout_seconds=60) as t:
while True:
source_volumes = list(self.ec2_service.describe(services.ec2_service.VOLUMES,
VolumeIds=source_volume_ids,
region=self._region_,
tags=True))
if all(len(v.get("Attachments", [])) > 0 for v in source_volumes):
source_device_tags = {v["Attachments"][0]["Device"]: v.get("Tags", {}) for v in source_volumes}
break
if t.timeout:
self._logger_.error(ERR_GETTING_SOURCE_TAGS, self.instance_id)
source_device_tags = {}
break
time.sleep(10)
for t in list(source_device_tags.values()):
if len(t) > 0:
break
else:
self._logger_.info(INF_NO_VOLUME_TAGS_TO_COPY, self.new_instance_id)
return
all_volume_tags = list(source_device_tags.values())
for t in all_volume_tags:
if t != all_volume_tags[0]:
all_same_tags = False
break
else:
all_same_tags = True
with Timer(60) as t:
while True:
volumes = list(self.ec2_service.describe(services.ec2_service.VOLUMES,
region=self._region_,
Filters=[
{
"Name": "attachment.instance-id",
"Values": [self.new_instance_id]
}]))
if len(volumes) > 0:
break
if t.timeout:
self._logger_.error(ERR_GETTING_NEW_INST_VOLUMES, self.new_instance_id)
return
time.sleep(10)
if all_same_tags:
volume_ids = [v["VolumeId"] for v in volumes]
tags = list(source_device_tags.values())[0]
self._logger_.info(INF_COPY_TAGS_SAME, tags, ", ".join(volume_ids),
self.new_instance_id)
self.ec2_client.create_tags_with_retries(Resources=volume_ids, Tags=tag_key_value_list(tags))
else:
devices = {v["Attachments"][0]["Device"]: v["VolumeId"] for v in volumes}
for v in devices:
tags = source_device_tags.get(v, {})
volume_id = [devices[v]]
if len(tags) > 0:
self._logger_.info(INF_COPY_TAGS_PER_VOLUME, tags, volume_id[0], self.new_instance_id)
self.ec2_client.create_tags_with_retries(Resources=volume_id, Tags=tag_key_value_list(tags))
def get_user_data():
return self.ec2_service.get(services.ec2_service.INSTANCE_ATTRIBUTE,
InstanceId=self.instance_id,
region=self._region_,
Attribute="userData").get("UserData", None)
def get_termination_protection():
return self.ec2_service.get(services.ec2_service.INSTANCE_ATTRIBUTE,
InstanceId=self.instance_id,
region=self._region_,
Attribute="disableApiTermination").get("DisableApiTermination")
def get_kernel():
return self.ec2_service.get(services.ec2_service.INSTANCE_ATTRIBUTE,
InstanceId=self.instance_id,
region=self._region_,
Attribute="kernel").get("Kernel")
def get_ramdisk():
return self.ec2_service.get(services.ec2_service.INSTANCE_ATTRIBUTE,
InstanceId=self.instance_id,
region=self._region_,
Attribute="ramdisk").get("RamdiskId")
def get_shutdown_behavior():
return self.ec2_service.get(services.ec2_service.INSTANCE_ATTRIBUTE,
InstanceId=self.instance_id,
region=self._region_,
Attribute="instanceInitiatedShutdownBehavior").get("InstanceInitiatedShutdownBehavior")
def get_ebs_optimized():
if self.new_instance_type.startswith(
"t2.") or self.new_instance_type in INSTANCES_TYPES_NOT_SUPPORTING_EBS_OPTIMIZATION:
return False
return self.instance["EbsOptimized"]
def get_network_interfaces():
result = [
{
"AssociatePublicIpAddress": network_interface.get("Association", {}).get("PublicIp") is not None,
"DeleteOnTermination": network_interface["Attachment"]["DeleteOnTermination"],
"Description": network_interface.get("Description", ""),
"DeviceIndex": int(network_interface["Attachment"]["DeviceIndex"]),
"Groups":
[g["GroupId"] for g in network_interface["Groups"]
],
"Ipv6AddressCount": len(network_interface["Ipv6Addresses"]),
"SecondaryPrivateIpAddressCount": max(len(network_interface["PrivateIpAddresses"]) - 1, 0),
"SubnetId": network_interface["SubnetId"]
} for network_interface in self.instance["NetworkInterfaces"]
]
for i in result:
if i["SecondaryPrivateIpAddressCount"] == 0:
del i["SecondaryPrivateIpAddressCount"]
return result
args = {
"ImageId": self.instance["ImageId"],
"InstanceType": self.new_instance_type,
"MaxCount": 1,
"MinCount": 1,
"Monitoring": {"Enabled": self.instance["Monitoring"]["State"] == "enabled"},
"DisableApiTermination": get_termination_protection(),
"InstanceInitiatedShutdownBehavior": get_shutdown_behavior(),
"EbsOptimized": get_ebs_optimized(),
"NetworkInterfaces": get_network_interfaces()
}
if self.instance.get("KeyName") is not None:
args["KeyName"] = self.instance["KeyName"]
if self.instance.get("Placement") is not None:
args["Placement"] = self.instance["Placement"]
if self.instance.get("HibernationOptions") is not None:
args["HibernationOptions"] = self.instance["HibernationOptions"]
if self.instance.get("LicenseSpecifications") is not None:
args["LicenseConfigurationArn"] = [l["LicenseConfigurationArn"] for l in self.instance.get("Licenses", [])],
kernel_id = get_kernel()
if kernel_id is not None:
args["KernelId"] = kernel_id
ramdisk_id = get_ramdisk()
if kernel_id is not None:
args["RamdiskId"] = ramdisk_id
userdata = get_user_data()
if userdata not in ["", None, {}]:
args["UserData"] = base64.b64decode(userdata)
instance_profile = self.instance.get("IamInstanceProfile")
if instance_profile not in [None, {}]:
args["IamInstanceProfile"] = {"Arn": instance_profile["Arn"]}
capacity_reservation_specification = self.instance.get("CapacityReservationSpecification")
if capacity_reservation_specification not in [None, {}]:
args["CapacityReservationSpecification"] = capacity_reservation_specification
self._test_simulate_insufficient_instance_capacity()
self.new_instance = self.ec2_client.run_instances_with_retries(**args)["Instances"][0]
self.new_instance_id = self.new_instance["InstanceId"]
cpu_credits = self.ec2_service.get(services.ec2_service.INSTANCE_CREDIT_SPECIFICATIONS,
region=self._region_,
InstanceIds=[self.instance_id]).get("CpuCredits")
if cpu_credits == "unlimited" and regex.match(r"^t\d.", self.new_instance_type) is not None:
self.ec2_client.modify_instance_credit_specification_with_retries(InstanceCreditSpecifications=[
{
'InstanceId': self.new_instance_id,
'CpuCredits': "unlimited"
},
])
copy_volume_tags()