def provision_dcv_host_for_session()

in source/idea/idea-virtual-desktop-controller/src/ideavirtualdesktopcontroller/app/virtual_desktop_controller_utils.py [0:0]


    def provision_dcv_host_for_session(self, session: VirtualDesktopSession) -> dict:

        tags = {
            constants.IDEA_TAG_NAME: f'{self.context.cluster_name()}-{session.name}-{session.owner}',
            constants.IDEA_TAG_NODE_TYPE: constants.NODE_TYPE_DCV_HOST,
            constants.IDEA_TAG_ENVIRONMENT_NAME: self.context.cluster_name(),
            constants.IDEA_TAG_MODULE_ID: self.context.module_id(),
            constants.IDEA_TAG_MODULE_NAME: self.context.module_name(),
            constants.IDEA_TAG_MODULE_VERSION: self.context.module_version(),
            constants.IDEA_TAG_BACKUP_PLAN: f'{self.context.cluster_name()}-{self.context.module_id()}',
            constants.IDEA_TAG_PROJECT: session.project.name,
            constants.IDEA_TAG_DCV_SESSION_ID: 'TBD'
        }

        if Utils.is_not_empty(session.project.tags):
            for tag in session.project.tags:
                tags[tag.key] = tag.value

        custom_tags = self.context.config().get_list('global-settings.custom_tags', [])
        custom_tags_dict = Utils.convert_custom_tags_to_key_value_pairs(custom_tags)
        session_tags = Utils.convert_tags_list_of_dict_to_tags_dict(session.tags)
        tags = {
            **custom_tags_dict,
            **tags,
            **session_tags
        }

        aws_tags = []
        for key, value in tags.items():
            aws_tags.append({
                'Key': key,
                'Value': value
            })

        metadata_http_tokens = self.context.config().get_string('virtual-desktop-controller.dcv_session.metadata_http_tokens', required=True)

        kms_key_id = self.context.config().get_string('cluster.ebs.kms_key_id', required=False, default=None)
        if kms_key_id is None:
            kms_key_id = 'alias/aws/ebs'

        # Handle subnet processing for eVDI hosts

        randomize_subnet_method = self.context.config().get_bool(
            'vdc.dcv_session.network.randomize_subnets',
            default=Utils.get_as_bool(constants.DEFAULT_VDI_RANDOMIZE_SUBNETS, default=False)
        )
        subnet_autoretry_method = self.context.config().get_bool(
            'vdc.dcv_session.network.subnet_autoretry',
            default=Utils.get_as_bool(constants.DEFAULT_VDI_SUBNET_AUTORETRY, default=True)
        )
        # Determine if we have a specific list of subnets configured for VDI
        configured_vdi_subnets = self.context.config().get_list(
            'vdc.dcv_session.network.private_subnets',
            default=[]
        )
        # Required=True as these should always be available, and we want to error otherwise
        cluster_private_subnets = self.context.config().get_list(
            'cluster.network.private_subnets',
            required=True
        )

        _attempt_subnets = []
        # Use a subnet_id if specified
        if Utils.is_not_empty(session.server.subnet_id):
            # this comes in as a string from the API
            self._logger.debug(f"Using strict requested subnet_id: {session.server.subnet_id}")
            _attempt_subnets.append(session.server.subnet_id)
        elif configured_vdi_subnets:
            # A list from the config
            self._logger.debug(f"Found configured VDI subnets: {', '.join(configured_vdi_subnets)}")
            _attempt_subnets = configured_vdi_subnets
        else:
            # fallback to a list of cluster private_subnets
            self._logger.debug(f"Fallback to cluster private_subnets: {', '.join(cluster_private_subnets)}")
            _attempt_subnets = cluster_private_subnets

        # Shuffle the list if configured for random subnet
        if randomize_subnet_method:
            random.shuffle(_attempt_subnets)
            self._logger.debug(f"Applying randomize to subnet list due to configuration.")

        # At this stage _attempt_subnets contains the subnets
        # we want to attempt in the order that we prefer
        # (ordered or pre-shuffled)

        placement = {
            "Tenancy": session.software_stack.placement.tenancy if session.software_stack.placement else VirtualDesktopTenancy.DEFAULT,
        }
        if placement["Tenancy"] == VirtualDesktopTenancy.HOST:
            placement["Affinity"] = session.software_stack.placement.affinity
            if session.software_stack.placement.host_id:
                placement["HostId"] = session.software_stack.placement.host_id
            else:
                placement["HostResourceGroupArn"] = session.software_stack.placement.host_resource_group_arn

        self._logger.debug(f"Deployment Attempt Ready - Retry: {subnet_autoretry_method} Attempt_Subnets({len(_attempt_subnets)}): {', '.join(_attempt_subnets)}")

        _deployment_loop = 0
        _attempt_provision = True

        while _attempt_provision:
            _deployment_loop += 1
            # We just .pop(0) since the list has been randomized already if it was requested
            _subnet_to_try = _attempt_subnets.pop(0)
            _remainig_subnet_count = len(_attempt_subnets)

            self._logger.info(f"Deployment attempt #{_deployment_loop} subnet_id: {_subnet_to_try} Remaining Subnets: {_remainig_subnet_count}")
            if _remainig_subnet_count <= 0:
                # this is our last attempt
                self._logger.debug(f"Final deployment attempt (_remaining_subnet_count == 0)")
                _attempt_provision = False
            else:
                _attempt_provision = True

            response = None

            try:
                response = self.ec2_client.run_instances(
                    UserData=self._build_userdata(session),
                    ImageId=session.software_stack.ami_id,
                    InstanceType=session.server.instance_type,
                    TagSpecifications=[
                        {
                            'ResourceType': 'instance',
                            'Tags': aws_tags
                        }
                    ],
                    MaxCount=1,
                    MinCount=1,
                    NetworkInterfaces=[
                        {
                            'DeviceIndex': 0,
                            'AssociatePublicIpAddress': False,
                            'SubnetId': _subnet_to_try,
                            'Groups': session.server.security_groups
                        }
                    ],
                    IamInstanceProfile={
                        'Arn': session.server.instance_profile_arn
                    },
                    BlockDeviceMappings=[
                        {
                            'DeviceName': Utils.get_ec2_block_device_name(session.software_stack.base_os.value),
                            'Ebs': {
                                'DeleteOnTermination': True,
                                'VolumeSize': session.server.root_volume_size.int_val(),
                                'Encrypted': Utils.get_as_bool(constants.DEFAULT_VOLUME_ENCRYPTION_VDI, default=True),
                                'KmsKeyId': kms_key_id,
                                'VolumeType': Utils.get_as_string(constants.DEFAULT_VOLUME_TYPE_VDI, default='gp3')
                            }
                        }
                    ],
                    KeyName=session.server.key_pair_name,
                    HibernationOptions={
                        'Configured': False if Utils.is_empty(session.hibernation_enabled) else session.hibernation_enabled
                    },
                    MetadataOptions={
                        'HttpTokens': metadata_http_tokens,
                        'HttpEndpoint': 'enabled'
                    },
                    Placement=placement,
                )
            except Exception as err:
                self._logger.warning(f"Encountered Deployment Exception: {err} / Response: {response}")
                self._logger.debug(f"Remaining subnets {len(_attempt_subnets)}: {_attempt_subnets}")
                if subnet_autoretry_method and _attempt_provision:
                    self._logger.debug(f"Continue with next attempt with remaining subnets..")
                    continue
                else:
                    self._logger.warning(f"Exhausted all deployment attempts. Cannot continue with this request.")
                    raise err  # session failure reason will be shown to user

            if response:
                self._logger.debug(f"Returning response: {response}")
                return Utils.to_dict(response)