def _create_replacement_instance()

in source/code/actions/ec2_replace_instance_action.py [0:0]


    def _create_replacement_instance(self):

        def copy_volume_tags():
            source_volume_ids = [m["Ebs"]["VolumeId"] for m in self.instance["BlockDeviceMappings"]]

            with Timer(timeout_seconds=60) as t:
                while True:
                    source_volumes = list(self.ec2_service.describe(services.ec2_service.VOLUMES,
                                                                    VolumeIds=source_volume_ids,
                                                                    region=self._region_,
                                                                    tags=True))
                    if all(len(v.get("Attachments", [])) > 0 for v in source_volumes):
                        source_device_tags = {v["Attachments"][0]["Device"]: v.get("Tags", {}) for v in source_volumes}
                        break

                    if t.timeout:
                        self._logger_.error(ERR_GETTING_SOURCE_TAGS, self.instance_id)
                        source_device_tags = {}
                        break

                    time.sleep(10)

            for t in list(source_device_tags.values()):
                if len(t) > 0:
                    break
            else:
                self._logger_.info(INF_NO_VOLUME_TAGS_TO_COPY, self.new_instance_id)
                return

            all_volume_tags = list(source_device_tags.values())
            for t in all_volume_tags:
                if t != all_volume_tags[0]:
                    all_same_tags = False
                    break
            else:
                all_same_tags = True
            with Timer(60) as t:
                while True:
                    volumes = list(self.ec2_service.describe(services.ec2_service.VOLUMES,
                                                             region=self._region_,
                                                             Filters=[
                                                                 {
                                                                     "Name": "attachment.instance-id",
                                                                     "Values": [self.new_instance_id]
                                                                 }]))
                    if len(volumes) > 0:
                        break

                    if t.timeout:
                        self._logger_.error(ERR_GETTING_NEW_INST_VOLUMES, self.new_instance_id)
                        return

                    time.sleep(10)

            if all_same_tags:
                volume_ids = [v["VolumeId"] for v in volumes]
                tags = list(source_device_tags.values())[0]
                self._logger_.info(INF_COPY_TAGS_SAME, tags, ", ".join(volume_ids),
                                   self.new_instance_id)
                self.ec2_client.create_tags_with_retries(Resources=volume_ids, Tags=tag_key_value_list(tags))
            else:
                devices = {v["Attachments"][0]["Device"]: v["VolumeId"] for v in volumes}
                for v in devices:
                    tags = source_device_tags.get(v, {})
                    volume_id = [devices[v]]
                    if len(tags) > 0:
                        self._logger_.info(INF_COPY_TAGS_PER_VOLUME, tags, volume_id[0], self.new_instance_id)
                        self.ec2_client.create_tags_with_retries(Resources=volume_id, Tags=tag_key_value_list(tags))

        def get_user_data():
            return self.ec2_service.get(services.ec2_service.INSTANCE_ATTRIBUTE,
                                        InstanceId=self.instance_id,
                                        region=self._region_,
                                        Attribute="userData").get("UserData", None)

        def get_termination_protection():
            return self.ec2_service.get(services.ec2_service.INSTANCE_ATTRIBUTE,
                                        InstanceId=self.instance_id,
                                        region=self._region_,
                                        Attribute="disableApiTermination").get("DisableApiTermination")

        def get_kernel():
            return self.ec2_service.get(services.ec2_service.INSTANCE_ATTRIBUTE,
                                        InstanceId=self.instance_id,
                                        region=self._region_,
                                        Attribute="kernel").get("Kernel")

        def get_ramdisk():
            return self.ec2_service.get(services.ec2_service.INSTANCE_ATTRIBUTE,
                                        InstanceId=self.instance_id,
                                        region=self._region_,
                                        Attribute="ramdisk").get("RamdiskId")

        def get_shutdown_behavior():
            return self.ec2_service.get(services.ec2_service.INSTANCE_ATTRIBUTE,
                                        InstanceId=self.instance_id,
                                        region=self._region_,
                                        Attribute="instanceInitiatedShutdownBehavior").get("InstanceInitiatedShutdownBehavior")

        def get_ebs_optimized():
            if self.new_instance_type.startswith(
                    "t2.") or self.new_instance_type in INSTANCES_TYPES_NOT_SUPPORTING_EBS_OPTIMIZATION:
                return False
            return self.instance["EbsOptimized"]

        def get_network_interfaces():
            result = [
                {
                    "AssociatePublicIpAddress": network_interface.get("Association", {}).get("PublicIp") is not None,
                    "DeleteOnTermination": network_interface["Attachment"]["DeleteOnTermination"],
                    "Description": network_interface.get("Description", ""),
                    "DeviceIndex": int(network_interface["Attachment"]["DeviceIndex"]),
                    "Groups":
                        [g["GroupId"] for g in network_interface["Groups"]
                         ],
                    "Ipv6AddressCount": len(network_interface["Ipv6Addresses"]),

                    "SecondaryPrivateIpAddressCount": max(len(network_interface["PrivateIpAddresses"]) - 1, 0),
                    "SubnetId": network_interface["SubnetId"]
                } for network_interface in self.instance["NetworkInterfaces"]
            ]
            for i in result:
                if i["SecondaryPrivateIpAddressCount"] == 0:
                    del i["SecondaryPrivateIpAddressCount"]
            return result

        args = {
            "ImageId": self.instance["ImageId"],
            "InstanceType": self.new_instance_type,
            "MaxCount": 1,
            "MinCount": 1,
            "Monitoring": {"Enabled": self.instance["Monitoring"]["State"] == "enabled"},
            "DisableApiTermination": get_termination_protection(),
            "InstanceInitiatedShutdownBehavior": get_shutdown_behavior(),
            "EbsOptimized": get_ebs_optimized(),
            "NetworkInterfaces": get_network_interfaces()
        }
        if self.instance.get("KeyName") is not None:
            args["KeyName"] = self.instance["KeyName"]

        if self.instance.get("Placement") is not None:
            args["Placement"] = self.instance["Placement"]

        if self.instance.get("HibernationOptions") is not None:
            args["HibernationOptions"] = self.instance["HibernationOptions"]

        if self.instance.get("LicenseSpecifications") is not None:
            args["LicenseConfigurationArn"] = [l["LicenseConfigurationArn"] for l in self.instance.get("Licenses", [])],

        kernel_id = get_kernel()
        if kernel_id is not None:
            args["KernelId"] = kernel_id

        ramdisk_id = get_ramdisk()
        if kernel_id is not None:
            args["RamdiskId"] = ramdisk_id

        userdata = get_user_data()
        if userdata not in ["", None, {}]:
            args["UserData"] = base64.b64decode(userdata)

        instance_profile = self.instance.get("IamInstanceProfile")
        if instance_profile not in [None, {}]:
            args["IamInstanceProfile"] = {"Arn": instance_profile["Arn"]}

        capacity_reservation_specification = self.instance.get("CapacityReservationSpecification")
        if capacity_reservation_specification not in [None, {}]:
            args["CapacityReservationSpecification"] = capacity_reservation_specification

        self._test_simulate_insufficient_instance_capacity()
        self.new_instance = self.ec2_client.run_instances_with_retries(**args)["Instances"][0]
        self.new_instance_id = self.new_instance["InstanceId"]

        cpu_credits = self.ec2_service.get(services.ec2_service.INSTANCE_CREDIT_SPECIFICATIONS,
                                           region=self._region_,
                                           InstanceIds=[self.instance_id]).get("CpuCredits")

        if cpu_credits == "unlimited" and regex.match(r"^t\d.", self.new_instance_type) is not None:
            self.ec2_client.modify_instance_credit_specification_with_retries(InstanceCreditSpecifications=[
                {
                    'InstanceId': self.new_instance_id,
                    'CpuCredits': "unlimited"
                },
            ])

        copy_volume_tags()