source/idea/idea-virtual-desktop-controller/src/ideavirtualdesktopcontroller/app/virtual_desktop_controller_utils.py (570 lines of code) (raw):
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance
# with the License. A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions
# and limitations under the License.
import os
import logging
import random
from threading import RLock
from typing import List, Dict, Optional
from ideasdk.metrics import CloudWatchAgentLogFileOptions
import ideavirtualdesktopcontroller
from botocore.exceptions import ClientError
from ideadatamodel import constants, VirtualDesktopArchitecture, Project
from ideadatamodel import (
VirtualDesktopSession,
VirtualDesktopBaseOS,
BaseOS,
VirtualDesktopGPU,
VirtualDesktopTenancy,
SocaMemory,
SocaMemoryUnit,
VirtualDesktopSoftwareStack
)
from ideasdk.bootstrap import BootstrapPackageBuilder, BootstrapUserDataBuilder, BootstrapUtils
from ideasdk.context import BootstrapContext
from ideasdk.launch_configurations import ScriptOSType, ScriptEventType
from ideasdk.utils import Utils, GroupNameHelper
from ideavirtualdesktopcontroller.app.clients.events_client.events_client import VirtualDesktopEventType
from ideavirtualdesktopcontroller.app.events.events_utils import EventsUtils
class VirtualDesktopControllerUtils:
def __init__(self, context: ideavirtualdesktopcontroller.AppContext):
self.context = context
self._logger = self.context.logger('virtual-desktop-controller-utils')
self.s3_client = self.context.aws().s3()
self.ec2_client = self.context.aws().ec2()
self.eventbridge_client = self.context.aws().eventbridge()
self.ssm_client = self.context.aws().ssm()
self.sqs_client = self.context.aws().sqs()
self.events_utils = EventsUtils(context=self.context)
self.INSTANCE_TYPES_NAMES_LIST_CACHE_KEY = 'aws.ec2.all-instance-types-names-list'
self.INSTANCE_INFO_CACHE_KEY = 'aws.ec2.all-instance-types-data'
self.instance_types_lock = RLock()
self.group_name_helper = GroupNameHelper(self.context)
def create_tag(self, instance_id: str, tag_key: str, tag_value: str):
self.ec2_client.create_tags(
Resources=[
instance_id
],
Tags=[{
'Key': tag_key,
'Value': tag_value
}]
)
def _build_and_upload_bootstrap_package(self, session: VirtualDesktopSession) -> str:
bootstrap_context = BootstrapContext(
config=self.context.config(),
module_name=constants.MODULE_VIRTUAL_DESKTOP_CONTROLLER,
module_id=self.context.module_id(),
module_set=self.context.module_set(),
base_os=session.software_stack.base_os.value,
instance_type=session.server.instance_type
)
BootstrapUtils.check_and_attach_cloudwatch_logging_and_metrics(
bootstrap_context=bootstrap_context,
metrics_namespace=f'{self.context.cluster_name()}/vdi-app/{session.idea_session_id}',
node_type=constants.NODE_TYPE_APP,
enable_logging=self.context.config().get_bool('virtual-desktop-app.cloudwatch_logs.enabled', False),
log_files=[
CloudWatchAgentLogFileOptions(
file_path='/opt/idea/app/logs/**.log',
log_group_name=f'/{self.context.cluster_name()}/vdi-app/{session.idea_session_id}',
log_stream_name='application_{ip_address}'
)
]
)
cluster_s3_bucket = self.context.config().get_string('cluster.cluster_s3_bucket', required=True)
bootstrap_context.vars.session_owner = session.owner
bootstrap_context.vars.idea_session_id = session.idea_session_id
bootstrap_context.vars.project = session.project.name
bootstrap_context.vars.cognito_min_id = constants.COGNITO_MIN_ID_INCLUSIVE
bootstrap_context.vars.cognito_max_id = constants.COGNITO_MAX_ID_INCLUSIVE
bootstrap_context.vars.cognito_uid_attribute = constants.COGNITO_UID_ATTRIBUTE
bootstrap_context.vars.cognito_default_user_group = constants.COGNITO_DEFAULT_USER_GROUP
bootstrap_context.vars.app_package_uri = f's3://{cluster_s3_bucket}/idea/releases/idea-virtual-desktop-{self.context.module_version()}.tar.gz'
if session.software_stack.base_os != VirtualDesktopBaseOS.WINDOWS:
escape_chars = '\\'
else:
escape_chars = '`'
# TODO: Deprecate
bootstrap_context.vars.dcv_host_ready_message = f'{{{escape_chars}"event_group_id{escape_chars}":{escape_chars}"{session.idea_session_id}{escape_chars}",{escape_chars}"event_type{escape_chars}":{escape_chars}"{VirtualDesktopEventType.DCV_HOST_READY_EVENT}{escape_chars}",{escape_chars}"detail{escape_chars}":{{{escape_chars}"idea_session_id{escape_chars}":{escape_chars}"{session.idea_session_id}{escape_chars}",{escape_chars}"idea_session_owner{escape_chars}":{escape_chars}"{session.owner}{escape_chars}"}}}}'
components = ['virtual-desktop-host-linux', 'nice-dcv-linux', 'vdi-helper']
if session.software_stack.base_os == VirtualDesktopBaseOS.WINDOWS:
components = ['virtual-desktop-host-windows', 'vdi-helper']
bootstrap_package_archive_file = BootstrapPackageBuilder(
bootstrap_context=bootstrap_context,
source_directory=self.context.get_bootstrap_dir(),
target_package_basename=f'dcv-host-{session.idea_session_id}',
components=components,
tmp_dir=os.path.join(f'{self.context.config().get_string("shared-storage.internal.mount_dir", required=True)}', self.context.cluster_name(), self.context.module_id(), 'dcv-host-bootstrap', session.owner, f'{Utils.to_secure_filename(session.name)}-{session.idea_session_id}'),
force_build=True,
logger=self._logger
).build()
self._logger.debug(f'{session.idea_session_id} built bootstrap package: {bootstrap_package_archive_file}')
upload_key = f'idea/{self.context.module_id()}/dcv-host-bootstrap/{Utils.to_secure_filename(session.name)}-{session.idea_session_id}/{os.path.basename(bootstrap_package_archive_file)}'
self._logger.debug(f'{session.idea_session_id} uploading bootstrap package: {upload_key}')
self.s3_client.upload_file(
Bucket=cluster_s3_bucket,
Filename=bootstrap_package_archive_file,
Key=upload_key
)
return f's3://{cluster_s3_bucket}/{upload_key}'
def _build_userdata(self, session: VirtualDesktopSession):
bootstrap_context = BootstrapContext(
config=self.context.config(),
module_name=constants.MODULE_VIRTUAL_DESKTOP_CONTROLLER,
module_id=self.context.module_id(),
module_set=self.context.module_set(),
base_os=session.software_stack.base_os.value,
instance_type=session.server.instance_type
)
gpu_family = "NONE"
if bootstrap_context.is_nvidia_gpu():
gpu_family = "NVIDIA"
elif bootstrap_context.is_amd_gpu():
gpu_family = "AMD"
# Store
## Add on vdi start commands here for linux here
rerun_on_reboot = self._retrieve_rerun_on_reboot(session.project, ScriptOSType.LINUX)
on_vdi_start_script_commands = self._retrieve_scripts_as_commands(session.project, ScriptOSType.LINUX, ScriptEventType.ON_VDI_START)
on_vdi_configured_script_commands = self._retrieve_scripts_as_commands(session.project, ScriptOSType.LINUX, ScriptEventType.ON_VDI_CONFIGURED)
on_vdi_start_script_store = self._store_commands_as_linux_script(on_vdi_start_script_commands, ScriptEventType.ON_VDI_START)
on_vdi_configured_script_store = self._store_commands_as_linux_script(on_vdi_configured_script_commands, ScriptEventType.ON_VDI_CONFIGURED)
lock_file = "/root/bootstrap/semaphore/custom_script.lock"
instance_ready_lock_file = "/root/bootstrap/semaphore/instance_ready.lock"
custom_script_check = [f"if [[ ! -f {lock_file} ]]; then"]
if rerun_on_reboot:
custom_script_check = [f"if [[ ! -f {lock_file} || -f {instance_ready_lock_file} ]]; then"]
custom_script_commands = custom_script_check + [
*on_vdi_start_script_store,
*on_vdi_configured_script_store,
'/bin/bash virtual-desktop-host-linux/export_launch_script_env.sh -p {0} -o {1} -n {2} -e {3} -c {4} -s {5} -r {6}'.format(session.project.project_id, session.owner, session.project.name, self.context.config().cluster_name, f'{ScriptEventType.ON_VDI_CONFIGURED}.sh', f'{ScriptEventType.ON_VDI_START}.sh', rerun_on_reboot),
'source /etc/launch_script_environment',
f'/bin/bash virtual-desktop-host-linux/{ScriptEventType.ON_VDI_START}.sh',
f"echo $(date +%s) > {lock_file}",
"fi"
]
install_commands = custom_script_commands + [
'/bin/bash virtual-desktop-host-linux/install.sh -r {0} -n {1} -g {2} -p false'.format(self.context.config().aws_region, self.context.config().cluster_name, gpu_family),
]
if session.software_stack.base_os == BaseOS.WINDOWS:
change_directory_command = ['cd \"virtual-desktop-host-windows\"']
on_vdi_start_script_commands = self._retrieve_scripts_as_commands(session.project, ScriptOSType.WINDOWS, ScriptEventType.ON_VDI_START)
on_vdi_configured_script_commands = self._retrieve_scripts_as_commands(session.project, ScriptOSType.WINDOWS, ScriptEventType.ON_VDI_CONFIGURED)
on_vdi_start_script_store = self._store_commands_as_windows_script(on_vdi_start_script_commands, ScriptEventType.ON_VDI_START)
on_vdi_configured_script_store = self._store_commands_as_windows_script(on_vdi_configured_script_commands, ScriptEventType.ON_VDI_CONFIGURED)
export_env_variables_commands = ['Import-Module .\\ExportLaunchScriptEnv.ps1',
f'Export-EnvironmentVariables -ProjectId "{session.project.project_id}" -OwnerId "{session.owner}" -EnvName "{self.context.config().cluster_name}" -ProjectName "{session.project.name}" -OnVDIStartCommands "{ScriptEventType.ON_VDI_START}.ps1" -OnVDIConfigureCommands "{ScriptEventType.ON_VDI_CONFIGURED}.ps1"']
on_vdi_start_script_commands = ['Import-Module .\\DownloadAndExecuteScript.ps1', '& .\\$env:ON_VDI_START_COMMANDS']
install_commands = change_directory_command + export_env_variables_commands + on_vdi_start_script_store + on_vdi_configured_script_store + on_vdi_start_script_commands + [
'Import-Module .\\Install.ps1',
f'Install-WindowsEC2Instance -ConfigureForRESVDI -AWSRegion "{self.context.config().aws_region}" -ENVName "{self.context.config().cluster_name}"'
]
https_proxy = self.context.config().get_string('cluster.network.https_proxy', required=False, default='')
no_proxy = self.context.config().get_string('cluster.network.no_proxy', required=False, default='')
proxy_config = {}
if Utils.is_not_empty(https_proxy):
proxy_config = {
'http_proxy': https_proxy,
'https_proxy': https_proxy,
'no_proxy': no_proxy
}
user_data_builder = BootstrapUserDataBuilder(
base_os=session.software_stack.base_os.value,
aws_region=self.context.config().get_string('cluster.aws.region', required=True),
bootstrap_package_uri=self._build_and_upload_bootstrap_package(session),
install_commands=install_commands,
proxy_config=proxy_config,
substitution_support=False,
bootstrap_source_dir_path=self.context.get_bootstrap_dir()
)
return user_data_builder.build()
def _store_commands_as_linux_script(self, commands: List[str], scriptName: str) -> List[str]:
begin = "#!/bin/bash"
script = "\n".join([begin] + commands)
return [f'echo "{script}" > virtual-desktop-host-linux/{scriptName}.sh']
def _store_commands_as_windows_script(self, commands: List[str], scriptName: str) -> List[str]:
script = "`n".join(commands)
return [f'"{script}" | Out-File -FilePath {scriptName}.ps1']
def _retrieve_scripts_as_commands(self, project: Project, os_type: ScriptOSType, script_event: ScriptEventType) -> List[str]:
scripts = project.scripts
if not scripts:
return []
script_dir = '/root/bootstrap/latest'
script_type = None
if os_type == ScriptOSType.LINUX:
script_type = scripts.linux
command_prefix = f'/bin/bash {script_dir}/virtual-desktop-host-linux/download_and_execute_script.sh'
elif os_type == ScriptOSType.WINDOWS:
script_type = scripts.windows
command_prefix = "Download-And-Execute-Script -uri"
if not script_type:
return []
event_scripts = getattr(script_type, script_event.value, None)
if not event_scripts:
return []
if os_type == ScriptOSType.LINUX:
return [f"{command_prefix} {script.script_location} {' '.join(script.arguments or [])}" for script in event_scripts]
elif os_type == ScriptOSType.WINDOWS:
return [f"{command_prefix} {script.script_location} -arguments '{' '.join(script.arguments or [])}'" for script in event_scripts]
def _retrieve_rerun_on_reboot(self, project:Project, os_type:ScriptOSType) -> bool:
scripts = project.scripts
if not scripts:
return False
script_type = None
if os_type == ScriptOSType.LINUX:
script_type = scripts.linux
elif os_type == ScriptOSType.WINDOWS:
script_type = scripts.windows
if not script_type:
return False
rerun_on_reboot = getattr(script_type, 'rerun_on_reboot', False)
return rerun_on_reboot
def provision_dcv_host_for_session(self, session: VirtualDesktopSession) -> dict:
tags = {
constants.IDEA_TAG_NAME: f'{self.context.cluster_name()}-{session.name}-{session.owner}',
constants.IDEA_TAG_NODE_TYPE: constants.NODE_TYPE_DCV_HOST,
constants.IDEA_TAG_ENVIRONMENT_NAME: self.context.cluster_name(),
constants.IDEA_TAG_MODULE_ID: self.context.module_id(),
constants.IDEA_TAG_MODULE_NAME: self.context.module_name(),
constants.IDEA_TAG_MODULE_VERSION: self.context.module_version(),
constants.IDEA_TAG_BACKUP_PLAN: f'{self.context.cluster_name()}-{self.context.module_id()}',
constants.IDEA_TAG_PROJECT: session.project.name,
constants.IDEA_TAG_DCV_SESSION_ID: 'TBD'
}
if Utils.is_not_empty(session.project.tags):
for tag in session.project.tags:
tags[tag.key] = tag.value
custom_tags = self.context.config().get_list('global-settings.custom_tags', [])
custom_tags_dict = Utils.convert_custom_tags_to_key_value_pairs(custom_tags)
session_tags = Utils.convert_tags_list_of_dict_to_tags_dict(session.tags)
tags = {
**custom_tags_dict,
**tags,
**session_tags
}
aws_tags = []
for key, value in tags.items():
aws_tags.append({
'Key': key,
'Value': value
})
metadata_http_tokens = self.context.config().get_string('virtual-desktop-controller.dcv_session.metadata_http_tokens', required=True)
kms_key_id = self.context.config().get_string('cluster.ebs.kms_key_id', required=False, default=None)
if kms_key_id is None:
kms_key_id = 'alias/aws/ebs'
# Handle subnet processing for eVDI hosts
randomize_subnet_method = self.context.config().get_bool(
'vdc.dcv_session.network.randomize_subnets',
default=Utils.get_as_bool(constants.DEFAULT_VDI_RANDOMIZE_SUBNETS, default=False)
)
subnet_autoretry_method = self.context.config().get_bool(
'vdc.dcv_session.network.subnet_autoretry',
default=Utils.get_as_bool(constants.DEFAULT_VDI_SUBNET_AUTORETRY, default=True)
)
# Determine if we have a specific list of subnets configured for VDI
configured_vdi_subnets = self.context.config().get_list(
'vdc.dcv_session.network.private_subnets',
default=[]
)
# Required=True as these should always be available, and we want to error otherwise
cluster_private_subnets = self.context.config().get_list(
'cluster.network.private_subnets',
required=True
)
_attempt_subnets = []
# Use a subnet_id if specified
if Utils.is_not_empty(session.server.subnet_id):
# this comes in as a string from the API
self._logger.debug(f"Using strict requested subnet_id: {session.server.subnet_id}")
_attempt_subnets.append(session.server.subnet_id)
elif configured_vdi_subnets:
# A list from the config
self._logger.debug(f"Found configured VDI subnets: {', '.join(configured_vdi_subnets)}")
_attempt_subnets = configured_vdi_subnets
else:
# fallback to a list of cluster private_subnets
self._logger.debug(f"Fallback to cluster private_subnets: {', '.join(cluster_private_subnets)}")
_attempt_subnets = cluster_private_subnets
# Shuffle the list if configured for random subnet
if randomize_subnet_method:
random.shuffle(_attempt_subnets)
self._logger.debug(f"Applying randomize to subnet list due to configuration.")
# At this stage _attempt_subnets contains the subnets
# we want to attempt in the order that we prefer
# (ordered or pre-shuffled)
placement = {
"Tenancy": session.software_stack.placement.tenancy if session.software_stack.placement else VirtualDesktopTenancy.DEFAULT,
}
if placement["Tenancy"] == VirtualDesktopTenancy.HOST:
placement["Affinity"] = session.software_stack.placement.affinity
if session.software_stack.placement.host_id:
placement["HostId"] = session.software_stack.placement.host_id
else:
placement["HostResourceGroupArn"] = session.software_stack.placement.host_resource_group_arn
self._logger.debug(f"Deployment Attempt Ready - Retry: {subnet_autoretry_method} Attempt_Subnets({len(_attempt_subnets)}): {', '.join(_attempt_subnets)}")
_deployment_loop = 0
_attempt_provision = True
while _attempt_provision:
_deployment_loop += 1
# We just .pop(0) since the list has been randomized already if it was requested
_subnet_to_try = _attempt_subnets.pop(0)
_remainig_subnet_count = len(_attempt_subnets)
self._logger.info(f"Deployment attempt #{_deployment_loop} subnet_id: {_subnet_to_try} Remaining Subnets: {_remainig_subnet_count}")
if _remainig_subnet_count <= 0:
# this is our last attempt
self._logger.debug(f"Final deployment attempt (_remaining_subnet_count == 0)")
_attempt_provision = False
else:
_attempt_provision = True
response = None
try:
response = self.ec2_client.run_instances(
UserData=self._build_userdata(session),
ImageId=session.software_stack.ami_id,
InstanceType=session.server.instance_type,
TagSpecifications=[
{
'ResourceType': 'instance',
'Tags': aws_tags
}
],
MaxCount=1,
MinCount=1,
NetworkInterfaces=[
{
'DeviceIndex': 0,
'AssociatePublicIpAddress': False,
'SubnetId': _subnet_to_try,
'Groups': session.server.security_groups
}
],
IamInstanceProfile={
'Arn': session.server.instance_profile_arn
},
BlockDeviceMappings=[
{
'DeviceName': Utils.get_ec2_block_device_name(session.software_stack.base_os.value),
'Ebs': {
'DeleteOnTermination': True,
'VolumeSize': session.server.root_volume_size.int_val(),
'Encrypted': Utils.get_as_bool(constants.DEFAULT_VOLUME_ENCRYPTION_VDI, default=True),
'KmsKeyId': kms_key_id,
'VolumeType': Utils.get_as_string(constants.DEFAULT_VOLUME_TYPE_VDI, default='gp3')
}
}
],
KeyName=session.server.key_pair_name,
HibernationOptions={
'Configured': False if Utils.is_empty(session.hibernation_enabled) else session.hibernation_enabled
},
MetadataOptions={
'HttpTokens': metadata_http_tokens,
'HttpEndpoint': 'enabled'
},
Placement=placement,
)
except Exception as err:
self._logger.warning(f"Encountered Deployment Exception: {err} / Response: {response}")
self._logger.debug(f"Remaining subnets {len(_attempt_subnets)}: {_attempt_subnets}")
if subnet_autoretry_method and _attempt_provision:
self._logger.debug(f"Continue with next attempt with remaining subnets..")
continue
else:
self._logger.warning(f"Exhausted all deployment attempts. Cannot continue with this request.")
raise err # session failure reason will be shown to user
if response:
self._logger.debug(f"Returning response: {response}")
return Utils.to_dict(response)
def _add_instance_data_to_cache(self):
with self.instance_types_lock:
instance_type_names = self.context.cache().long_term().get(self.INSTANCE_TYPES_NAMES_LIST_CACHE_KEY)
instance_info_data = self.context.cache().long_term().get(self.INSTANCE_INFO_CACHE_KEY)
if instance_type_names is None or instance_info_data is None:
instance_type_names = []
instance_info_data = {}
has_more = True
next_token = None
while has_more:
if next_token is None:
result = self.context.aws().ec2().describe_instance_types(MaxResults=100)
else:
result = self.context.aws().ec2().describe_instance_types(MaxResults=100, NextToken=next_token)
next_token = Utils.get_value_as_string('NextToken', result)
has_more = Utils.is_not_empty(next_token)
current_instance_types = Utils.get_value_as_list('InstanceTypes', result)
for current_instance_type in current_instance_types:
instance_type_name = Utils.get_value_as_string('InstanceType', current_instance_type, '')
instance_type_names.append(instance_type_name)
instance_info_data[instance_type_name] = current_instance_type
self.context.cache().long_term().set(self.INSTANCE_TYPES_NAMES_LIST_CACHE_KEY, instance_type_names)
self.context.cache().long_term().set(self.INSTANCE_INFO_CACHE_KEY, instance_info_data)
def is_gpu_instance(self, instance_type: str) -> bool:
return self.get_gpu_manufacturer(instance_type) != VirtualDesktopGPU.NO_GPU
def get_instance_type_info(self, instance_type: str) -> Dict:
instance_types_data = self.context.cache().long_term().get(self.INSTANCE_INFO_CACHE_KEY)
if instance_types_data is None:
# not found in cache, need to update it again.
self._add_instance_data_to_cache()
instance_types_data = self.context.cache().long_term().get(self.INSTANCE_INFO_CACHE_KEY)
return instance_types_data[instance_type]
def get_instance_ram(self, instance_type: str) -> SocaMemory:
instance_info = self.get_instance_type_info(instance_type)
return SocaMemory(
value=Utils.get_value_as_float('SizeInMiB', Utils.get_value_as_dict('MemoryInfo', instance_info, {}), 0),
unit=SocaMemoryUnit.MiB
)
def get_architecture(self, instance_type: str) -> Optional[VirtualDesktopArchitecture]:
instance_info = self.get_instance_type_info(instance_type)
supported_archs = Utils.get_value_as_list('SupportedArchitectures', Utils.get_value_as_dict('ProcessorInfo', instance_info, {}), [])
for supported_arch in supported_archs:
if supported_arch == VirtualDesktopArchitecture.ARM64.value:
return VirtualDesktopArchitecture.ARM64
if supported_arch == VirtualDesktopArchitecture.X86_64.value:
return VirtualDesktopArchitecture.X86_64
def get_gpu_manufacturer(self, instance_type: str) -> VirtualDesktopGPU:
instance_info = self.get_instance_type_info(instance_type)
supported_gpus = Utils.get_value_as_list('Gpus', Utils.get_value_as_dict('GpuInfo', instance_info, {}), [])
if len(supported_gpus) == 0:
return VirtualDesktopGPU.NO_GPU
for supported_gpu in supported_gpus:
if Utils.get_value_as_string('Manufacturer', supported_gpu, '').lower() == VirtualDesktopGPU.NVIDIA.lower():
return VirtualDesktopGPU.NVIDIA
elif Utils.get_value_as_string('Manufacturer', supported_gpu, '').lower() == VirtualDesktopGPU.AMD.lower():
return VirtualDesktopGPU.AMD
return VirtualDesktopGPU.NO_GPU
def dedicated_hosts_supported(self, instance_type: str) -> bool:
instance_info = self.get_instance_type_info(instance_type)
return instance_info.get('DedicatedHostsSupported', False)
def validate_min_ram(self, instance_type_name: str, software_stack: Optional[VirtualDesktopSoftwareStack]) -> bool:
if software_stack and software_stack.min_ram > self.get_instance_ram(instance_type_name):
# this instance doesn't have the minimum ram required to support the software stack.
self._logger.debug(f"Software stack ({software_stack.name}) restrictions on RAM ({software_stack.min_ram}): Instance {instance_type_name} lacks enough ({self.get_instance_ram(instance_type_name)}). Skipped.")
return False
return True
def get_valid_instance_types_by_allowed_list(self, hibernation_support: bool, allowed_instance_types: List[str]) -> Dict:
instance_types_names = self.context.cache().long_term().get(self.INSTANCE_TYPES_NAMES_LIST_CACHE_KEY)
instance_info_data = self.context.cache().long_term().get(self.INSTANCE_INFO_CACHE_KEY)
if instance_types_names is None or instance_info_data is None:
# not found in cache, need to update it again.
self._add_instance_data_to_cache()
instance_types_names = self.context.cache().long_term().get(self.INSTANCE_TYPES_NAMES_LIST_CACHE_KEY)
instance_info_data = self.context.cache().long_term().get(self.INSTANCE_INFO_CACHE_KEY)
# We now have a list of all instance types (Cache has been updated IF it was empty).
valid_instance_types_dict = {}
allowed_instance_type_names = set()
allowed_instance_type_families = set()
for instance_type in allowed_instance_types:
if '.' in instance_type:
allowed_instance_type_names.add(instance_type)
else:
allowed_instance_type_families.add(instance_type)
denied_instance_types = set(self.context.config().get_list('virtual-desktop-controller.dcv_session.instance_types.deny', default=[]))
denied_instance_type_names = set()
denied_instance_type_families = set()
for instance_type in denied_instance_types:
if '.' in instance_type:
denied_instance_type_names.add(instance_type)
else:
denied_instance_type_families.add(instance_type)
if self._logger.isEnabledFor(logging.DEBUG):
self._logger.debug(f"get_valid_instance_types() - Instance Allow/Deny Summary")
self._logger.debug(f"Allowed instance Families: {allowed_instance_type_families}")
self._logger.debug(f"Allowed instances: {allowed_instance_type_names}")
self._logger.debug(f"Denied instance Families: {denied_instance_type_families}")
self._logger.debug(f"Denied instances: {denied_instance_type_names}")
for instance_type_name in instance_types_names:
instance_type_family = instance_type_name.split('.')[0]
self._logger.debug(f"Processing - Instance Name: {instance_type_name} Family: {instance_type_family}")
if instance_type_name not in allowed_instance_type_names and instance_type_family not in allowed_instance_type_families:
# instance type or instance family is not present in allow list
self._logger.debug(f"Found {instance_type_name} ({instance_type_family}) NOT in ALLOW config: ({allowed_instance_type_names} / {allowed_instance_type_families})")
continue
if instance_type_name in denied_instance_type_names or instance_type_family in denied_instance_type_families:
# instance type or instance family is present in deny list
self._logger.debug(f"Found {instance_type_name} ({instance_type_family}) IN DENIED config: ({denied_instance_type_names} / {denied_instance_type_families})")
continue
instance_info = instance_info_data[instance_type_name]
hibernation_supported = Utils.get_value_as_bool('HibernationSupported', instance_info, default=False)
if hibernation_support and not hibernation_supported:
self._logger.debug(f"Hibernation ({hibernation_support}) != Instance {instance_type_name} ({hibernation_supported}) Skipped.")
continue
# All checks passed if we make it this far
self._logger.debug(f"Instance {instance_type_name} - Added as valid_instance_types")
valid_instance_types_dict[instance_type_name] = instance_info
self._logger.debug(f"Returning valid_instance_types: {valid_instance_types_dict.keys()}")
return valid_instance_types_dict
def get_valid_instance_types_by_software_stack(self, hibernation_support: bool, software_stack: VirtualDesktopSoftwareStack = None, gpu: VirtualDesktopGPU = None) -> List[Dict]:
allowed_instance_types = self.context.config().get_list('virtual-desktop-controller.dcv_session.instance_types.allow', default=[])
valid_instance_types_dict = self.get_valid_instance_types_by_allowed_list(hibernation_support, allowed_instance_types)
valid_instance_types_names = []
valid_instance_types = []
for instance_type_name in valid_instance_types_dict.keys():
instance_type_family = instance_type_name.split('.')[0]
instance_info = valid_instance_types_dict[instance_type_name]
self._logger.debug(f"Processing - Instance Name: {instance_type_name} Family: {instance_type_family}")
if Utils.is_not_empty(software_stack) and software_stack.base_os == VirtualDesktopBaseOS.RHEL8 and instance_type_family == 'g4dn':
# Disabling g4dn instances for RHEL8
self._logger.debug(f"g4dn instances are disabled for RHEL8 instances")
continue
if not self.validate_min_ram(instance_type_name, software_stack):
continue
supported_archs = Utils.get_value_as_list('SupportedArchitectures', Utils.get_value_as_dict('ProcessorInfo', instance_info, {}), [])
if Utils.is_not_empty(software_stack) and software_stack.architecture.value not in supported_archs:
# not desired architecture
self._logger.debug(f"Software Stack arch ({software_stack.architecture.value}) != Instance {instance_type_name} ({supported_archs}) Skipped.")
continue
if software_stack and software_stack.base_os == VirtualDesktopBaseOS.WINDOWS:
image_info = self.describe_image_id(software_stack.ami_id)
instance_boot_modes = instance_info.get('SupportedBootModes', [])
image_boot_mode = image_info.get('BootMode', '')
if image_boot_mode:
if image_boot_mode != 'uefi-preferred' and image_boot_mode not in instance_boot_modes:
# image has boot mode specified and boot mode is not supported by instance
self._logger.debug(f"Software stack ({software_stack}) restrictions on BootMode ({image_boot_mode}): Instance {instance_type_name} doesn't support ({image_boot_mode}). Skipped.")
continue
else:
if 'legacy-bios' not in instance_boot_modes:
# image has no boot mode specified and default legacy-bios is not supported by instance
self._logger.debug(f"Software stack ({software_stack}) restrictions on BootMode (legacy-bios): Instance {instance_type_name} doesn't support legacy-bios. Skipped.")
continue
supported_gpus = Utils.get_value_as_list('Gpus', Utils.get_value_as_dict('GpuInfo', instance_info, {}), [])
self._logger.debug(f"Instance {instance_type_name} GPU ({supported_gpus})")
gpu_to_check_against = None
if Utils.is_not_empty(gpu):
gpu_to_check_against = gpu
elif Utils.is_not_empty(software_stack):
gpu_to_check_against = software_stack.gpu
if gpu_to_check_against == VirtualDesktopGPU.NO_GPU:
# we don't need GPU
if len(supported_gpus) > 0:
# this instance SHOULD NOT have GPU support, but it does.
self._logger.debug(f"Instance {instance_type_name} Should not have GPU ({supported_gpus}) but it does.")
continue
elif gpu_to_check_against is not None:
# we need GPU
gpu_found = False
for supported_gpu in supported_gpus:
gpu_found = gpu_to_check_against.value.lower() == Utils.get_value_as_string('Manufacturer', supported_gpu, '').lower()
if gpu_found:
break
if not gpu_found:
# we needed a GPU, but we didn't find any
self._logger.debug(f"Instance {instance_type_name} - Needed a GPU but didn't find one.")
continue
if software_stack and software_stack.placement:
if software_stack.placement.tenancy == VirtualDesktopTenancy.HOST and not self.dedicated_hosts_supported(instance_type_name):
self._logger.debug(f"Instance {instance_type_name} doesn't support tenancy {software_stack.placement.tenancy}. Skipped.")
continue
# All checks passed if we make it this far
self._logger.debug(f"Instance {instance_type_name} - Added as valid_instance_types for software_stack")
valid_instance_types_names.append(instance_type_name)
valid_instance_types.append(instance_info)
self._logger.debug(f"Returning valid_instance_types for software_stack: {valid_instance_types_names}")
return valid_instance_types
def describe_image_id(self, ami_id: str) -> dict:
try:
response = Utils.to_dict(self.ec2_client.describe_images(
ImageIds=[
ami_id
]
))
except ClientError as e:
self._logger.error(e)
return {}
images = Utils.get_value_as_list('Images', response, [])
for image in images:
return image
return {}
def create_image_for_instance_id(self, instance_id, image_name, image_description):
if Utils.is_empty(image_name):
image_name = f'RES-IMAGE-NAME-{instance_id}'
if Utils.is_empty(image_description):
image_description = f'RES-IMAGE-DESCRIPTION-{instance_id}'
response = Utils.to_dict(self.ec2_client.create_image(
Name=image_name,
Description=image_description,
InstanceId=instance_id
))
return response
def is_active_directory(self) -> bool:
provider = self.context.config().get_string('directoryservice.provider', required=True)
return provider in {constants.DIRECTORYSERVICE_AWS_MANAGED_ACTIVE_DIRECTORY, constants.DIRECTORYSERVICE_ACTIVE_DIRECTORY}
def change_instance_type(self, instance_id: str, instance_type_name: str) -> (str, bool):
if Utils.is_empty(instance_id) or Utils.is_empty(instance_type_name):
return f'Invalid {instance_id} or {instance_type_name}', False
self._logger.info(f'Changing instance type for {instance_id} to {instance_type_name}')
# TODO: check if server is already stopped.
try:
_ = self.ec2_client.modify_instance_attribute(
InstanceId=instance_id,
Attribute='instanceType',
Value=instance_type_name
)
except Exception as e:
self._logger.error(e)
return repr(e), False
return '', True