ebcli/controllers/migrate.py (1,858 lines of code) (raw):

# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. import datetime import shutil import os import string import sys import re import xml.etree.ElementTree as ET from dataclasses import dataclass import zipfile from typing import Dict, List, Any, Union, Optional, Tuple, Set import collections import json import argparse if sys.platform.startswith("win"): import winreg import clr import win32com.client clr.AddReference("System.Reflection") clr.AddReference(r"C:\Windows\System32\inetsrv\Microsoft.Web.Administration.dll") clr.AddReference("System") clr.AddReference("System.Core") clr.AddReference("System.DirectoryServices.AccountManagement") from System.DirectoryServices.AccountManagement import ( PrincipalContext, ContextType, UserPrincipal, PrincipalSearcher, ) from System.Collections.Generic import HashSet, Queue from System.Reflection import Assembly from Microsoft.Web.Administration import ( ServerManager, Binding, Site, Application, ObjectState, ) from System.Diagnostics import Process, ProcessStartInfo from System.Runtime.InteropServices import COMException from cement.utils.misc import minimal_logger LOG = minimal_logger(__name__) from ebcli.core.abstractcontroller import AbstractBaseController from ebcli.core import io, fileoperations from ebcli.lib import utils, ec2, elasticbeanstalk, aws from ebcli.objects import requests from ebcli.objects.platform import PlatformVersion from ebcli.objects.exceptions import ( NotFoundError, NotAnEC2Instance, NotSupportedError, ) from ebcli.resources.strings import prompts, flag_text from ebcli.operations import commonops, createops, platformops, statusops from ebcli.operations.tagops import tagops from ebcli.resources.statics import namespaces class MigrateExploreController(AbstractBaseController): class Meta: argument_formatter = argparse.RawTextHelpFormatter label = "explore" description = flag_text["migrate.explore"] usage = "eb migrate explore" stacked_on = "migrate" stacked_type = "nested" def do_command(self): if not sys.platform.startswith("win"): raise NotSupportedError("'eb migrate explore' is only supported on Windows") verbose = self.app.pargs.verbose if verbose: list_sites_verbosely() else: io.echo("\n".join([s.Name for s in ServerManager().Sites])) class MigrateCleanupController(AbstractBaseController): class Meta: argument_formatter = argparse.RawTextHelpFormatter label = "cleanup" description = flag_text["migrate.cleanup"] usage = "eb migrate cleanup" stacked_on = "migrate" stacked_type = "nested" arguments = [ (["--force"], dict(action="store_true", help=flag_text["migrate.force"])), ] def do_command(self): if not sys.platform.startswith("win"): raise NotSupportedError("'eb migrate cleanup' is only supported on Windows") force = self.app.pargs.force cleanup_previous_migration_artifacts(force, self.app.pargs.verbose) # TODO: error when a physical path is in incidental to the migration execution path class MigrateController(AbstractBaseController): class Meta: argument_formatter = argparse.RawTextHelpFormatter label = "migrate" description = "This command migrates an IIS site or application from a source Windows machine to an environment hosted on AWS Elastic Beanstalk." usage = "eb migrate [options ...]" arguments = [ (["-s", "--sites"], dict(help=flag_text["migrate.sites"])), ( ["-e", "--environment-name"], dict(help=flag_text["migrate.environment_name"]), ), ( ["-a", "--application-name"], dict(help=flag_text["migrate.application_name"]), ), (["-p", "--platform"], dict(help=flag_text["migrate.platform"])), (["-i", "--instance-type"], dict(help=flag_text["migrate.instance_type"])), (["-c", "--cname"], dict(help=flag_text["migrate.cname"])), ( ["-ip", "--instance-profile"], dict(help=flag_text["migrate.instance_profile"]), ), (["-sr", "--service-role"], dict(help=flag_text["migrate.service_role"])), ( ["-es", "--ebs-snapshots"], dict(nargs="*", help=flag_text["migrate.ebs_snapshots"]), ), ( ["-st", "--stream-to-cloudwatch"], dict(action="store_true", help=argparse.SUPPRESS), ), ( ["-hc", "--use-host-ebs-configuration"], dict(action="store_true", help=argparse.SUPPRESS), ), (["-k", "--keyname"], dict(help=flag_text["migrate.keyname"])), ( ["-in", "--interactive"], dict(action="store_true", help=flag_text["migrate.interactive"]), ), (["-t", "--tags"], dict(help=flag_text["migrate.tags"])), (["-d", "--copy-deps"], dict(action="store_true", help=argparse.SUPPRESS)), ( ["-ao", "--archive-only"], dict(action="store_true", help=flag_text["migrate.archive_only"]), ), ( ["-op", "--on-prem-mode"], dict(action="store_true", help=argparse.SUPPRESS), ), ( ["-cf", "--copy-firewall-config"], dict( action="store_true", help=flag_text["migrate.copy_firewall_config"] ), ), ( ["--encrypt-ebs-volumes"], dict( action="store_true", help=flag_text["migrate.encrypt_ebs_volumes"] ), ), ( ["--ssl-certificates"], dict(help=flag_text["migrate.ssl_certificate_arns"]), ), (["--archive"], dict(help=flag_text["migrate.archive"])), (["-vpc", "--vpc-config"], dict(help=flag_text["migrate.vpc_config"])), # TODO: support userdata copy using robocopy ] def generate_ms_deploy_source_bundle( self, site: "Site", destination: str, verbose: bool, additional_virtual_dir_physical_paths: List[str] = [], ) -> None: """ Generate deployment bundle and manifest for an IIS site and its components. Creates a structured directory containing deployment artifacts for an IIS site, including all its applications and virtual directories. Maintains a deployment manifest that describes how to deploy these components. Args: site: IIS Site object to package destination: Base directory for deployment artifacts verbose: If True, provides detailed output during generation additional_virtual_dir_physical_paths: List to collect physical paths of virtual directories for permission configuration Directory Structure: destination/ ├── upload_target/ │ ├── source1.zip # Application bundles │ ├── source2.zip │ ├── aws-windows-deployment-manifest.json │ └── ebmigrateScripts/ # Helper PowerShell scripts │ ├── site_installer.ps1 │ ├── permission_handler.ps1 │ └── other helper scripts └── upload_target.zip # Final package Process Flow: 1. Creates upload_target and ebmigrateScripts directories 2. Creates or updates deployment manifest 3. For each application in site: - Checks for password protection - Generates MS Deploy package - Collects non-root virtual directory paths 4. Updates manifest with final configuration Manifest Structure: { "manifestVersion": 1, "deployments": { "msDeploy": [], # Default Web Site deployments "custom": [] # Custom site deployments } } Notes: - Creates directories with exist_ok=True - Collects virtual directory paths for later permission setup - Updates existing manifest if found, creates new if not - Uses indented JSON format for manifest readability """ if verbose: io.echo(f"Generating source bundle for {site.Name}") upload_target_dir = os.path.join(destination, "upload_target") os.makedirs(upload_target_dir, exist_ok=True) os.makedirs(os.path.join(upload_target_dir, "ebmigrateScripts"), exist_ok=True) manifest_file_path = os.path.join( upload_target_dir, "aws-windows-deployment-manifest.json" ) relative_normalized_manifest_path = absolute_to_relative_normalized_path( manifest_file_path ) if os.path.exists(manifest_file_path): if verbose: io.echo(f" Updating {relative_normalized_manifest_path}") with open(manifest_file_path) as file: manifest_contents = json.load(file) else: manifest_contents = { "manifestVersion": 1, "deployments": {"msDeploy": [], "custom": []}, } for application in site.Applications: warn_about_password_protection(site, application) ms_deploy_sync_application( site, application, destination, upload_target_dir, manifest_contents ) for vdir in application.VirtualDirectories: if vdir.Path != "/": additional_virtual_dir_physical_paths.append(vdir.PhysicalPath) if verbose: io.echo( f"Updating manifest file for archive at {relative_normalized_manifest_path}" ) with open(manifest_file_path, "w") as file: json.dump(manifest_contents, file, indent=4) def do_command(self): if not sys.platform.startswith("win"): raise NotSupportedError("'eb migrate' is only supported on Windows") validate_iis_version_greater_than_7_0() verbose = self.app.pargs.verbose site_names = self.app.pargs.sites env_name = self.app.pargs.environment_name app_name = self.app.pargs.application_name platform = self.app.pargs.platform instance_type = self.app.pargs.instance_type instance_profile = self.app.pargs.instance_profile service_role = self.app.pargs.service_role ebs_snapshots = self.app.pargs.ebs_snapshots keyname = self.app.pargs.keyname interactive = self.app.pargs.interactive cname = self.app.pargs.cname region = self.app.pargs.region archive_only = self.app.pargs.archive_only on_prem_mode = self.app.pargs.on_prem_mode tags = self.app.pargs.tags tags = tagops.get_and_validate_tags(tags) copy_firewall_config = self.app.pargs.copy_firewall_config encrypt_ebs_volumes = self.app.pargs.encrypt_ebs_volumes ssl_certificate = self.app.pargs.ssl_certificates archive = self.app.pargs.archive if archive and archive_only: raise ValueError("Cannot use --archive-only with --archive-dir together.") vpc_config = self.app.pargs.vpc_config sites = establish_candidate_sites(site_names, interactive) on_an_ec2_instance = True try: environment_vpc, _region, instance_id, instance_tags = ( construct_environment_vpc_config(on_prem_mode, verbose) ) on_an_ec2_instance = not not instance_id except NotAnEC2Instance: environment_vpc, _region, instance_id, instance_tags = ( dict(), None, list(), None, ) on_an_ec2_instance = False if vpc_config: environment_vpc = load_environment_vpc_from_vpc_config(vpc_config) region = _region or establish_region(region, interactive, app_name, platform) LOG.debug("Writing region_name to .elasticbeanstalk/config") fileoperations.write_config_setting("global", "region_name", region) tags = tags or instance_tags snapshots_string = generate_snapshots_string(ebs_snapshots) app_name = establish_app_name(app_name, interactive, sites) env_name = establish_env_name(env_name, app_name, interactive, sites) platform = establish_platform(platform, interactive) process_keyname(keyname) listener_configs = [] if not _arr_enabled(): listener_configs = get_listener_configs(sites, ssl_certificate) all_ports = get_all_ports(sites) ec2_security_group = None load_balancer_security_group = None if on_an_ec2_instance and copy_firewall_config: load_balancer_security_group, ec2_security_group = ( ec2.establish_security_group(all_ports, env_name, environment_vpc["id"]) ) source_bundle_zip = None upload_target_dir = None latest_migration_run_path = None if not archive: latest_migration_run_path = setup_migrations_dir(verbose) upload_target_dir = os.path.join(latest_migration_run_path, "upload_target") os.makedirs(upload_target_dir, exist_ok=True) self.package_sites( sites, latest_migration_run_path, upload_target_dir, verbose ) write_ebdeploy_utility_script(upload_target_dir) if _arr_enabled(): export_arr_config(upload_target_dir, verbose) if copy_firewall_config: write_copy_firewall_config_script(upload_target_dir, sites) fileoperations.zip_up_folder(upload_target_dir, upload_target_zip_path()) else: if zipfile.is_zipfile(archive): source_bundle_zip = archive else: upload_target_dir = archive latest_migration_run_path = os.path.dirname(upload_target_dir) self.package_sites( sites, latest_migration_run_path, upload_target_dir, verbose ) fileoperations.zip_up_folder( upload_target_dir, upload_target_zip_path() ) if listener_configs and latest_migration_run_path: with open( os.path.join(latest_migration_run_path, "listener_configs.json"), "w" ) as file: listener_configs_json = {"listener_configs": listener_configs} json.dump(listener_configs_json, file, indent=2) if archive_only and upload_target_dir: generate_upload_target_archive(upload_target_dir, env_name, region) return self.create_app_version_and_environment( app_name=app_name, source_bundle_zip=source_bundle_zip, instance_profile=instance_profile, service_role=service_role, instance_type=instance_type, cname=cname, env_name=env_name, encrypt_ebs_volumes=encrypt_ebs_volumes, environment_vpc=environment_vpc, ec2_security_group=ec2_security_group, platform=platform, keyname=keyname, tags=tags, snapshots_string=snapshots_string, listener_configs=listener_configs, load_balancer_security_group=load_balancer_security_group, interactive=interactive, ) # ------------------------------------------------------------------------------- # proceed to create application version and the EB environment beyond this point # ------------------------------------------------------------------------------- def create_app_version_and_environment( self, app_name, source_bundle_zip, instance_profile, service_role, instance_type, cname, env_name, encrypt_ebs_volumes, environment_vpc, ec2_security_group, platform, keyname, tags, snapshots_string, listener_configs, load_balancer_security_group, interactive, ): version_label = commonops.create_app_version( app_name, source_bundle=source_bundle_zip or upload_target_zip_path() ) instance_profile = establish_instance_profile(instance_profile) if not service_role: service_role = createops.create_default_service_role() instance_type = instance_type or "c5.2xlarge" cname = cname or get_unique_cname(env_name) if encrypt_ebs_volumes: do_encrypt_ebs_volumes() if environment_vpc and environment_vpc.get("securitygroups"): vpc_security_groups = set(environment_vpc["securitygroups"].split(",")) if ec2_security_group: vpc_security_groups.add(ec2_security_group.get("Value", set())) environment_vpc["securitygroups"] = ",".join(list(vpc_security_groups)) ec2_security_group = None root_volume = [ { "Namespace": namespaces.LAUNCH_CONFIGURATION, "OptionName": "RootVolumeSize", "Value": "60", } ] env_request = requests.CreateEnvironmentRequest( app_name=app_name, env_name=env_name, platform=platform, version_label=version_label, instance_profile=instance_profile, service_role=service_role, key_name=keyname, tags=tags, vpc=environment_vpc, elb_type="application", instance_types=instance_type, min_instances="1", max_instances="4", block_device_mappings=snapshots_string, listener_configs=listener_configs, cname=cname, description="Environment created by `eb migrate`", load_balancer_security_group=load_balancer_security_group, ec2_security_group=ec2_security_group, root_volume=root_volume, ) createops.make_new_env(env_request, interactive=interactive, timeout=15) def package_sites( self, sites: List["Site"], latest_migration_run_path: str, upload_target_dir: str, verbose: bool, ) -> None: """ Package IIS sites and their components for deployment. Creates deployment bundles for specified IIS sites, including their applications and virtual directories. Generates necessary PowerShell scripts for deployment and permission management. Args: sites: List of IIS Site objects to package latest_migration_run_path: Path to store migration artifacts upload_target_dir: Directory for deployment scripts and bundles verbose: If True, provides detailed output during packaging Process Flow: 1. Announces sites being packaged (if not verbose) 2. Generates MS Deploy bundles for each site 3. Creates necessary PowerShell scripts: - noop.ps1 for placeholder operations - Virtual directory permission script (if needed) 4. Updates manifest with virtual directory configurations Notes: - Tracks additional virtual directory paths across all sites - Creates permission management scripts only if virtual directories exist - Uses MS Deploy for package generation - Maintains list of physical paths requiring special permissions Example Output (non-verbose): Generating source bundle for sites, applications, and virtual directories: [Site1, Site2] """ additional_virtual_dir_physical_paths = [] if not verbose: command_separated_sites_list = ", ".join([s.Name for s in sites]) io.echo( f"Generating source bundle for sites, applications, and virtual directories: [{command_separated_sites_list}]" ) for site in sites: self.generate_ms_deploy_source_bundle( site, destination=latest_migration_run_path, verbose=verbose, additional_virtual_dir_physical_paths=additional_virtual_dir_physical_paths, ) create_noop_ps1_script(upload_target_dir) if additional_virtual_dir_physical_paths: create_virtualdir_path_permission_script( additional_virtual_dir_physical_paths, upload_target_dir ) add_virtual_directory_custom_script_to_manifest(upload_target_dir) def get_all_ports(sites): all_ports = set() for site in sites: for binding in site.Bindings: all_ports.add(int(binding.get_BindingInformation().split(":")[1])) return all_ports def get_unique_non_80_ports(sites): all_ports = set() for site in sites: bindings = site.Bindings for binding in bindings: port = binding.BindingInformation.split(":")[1] if port != "80": all_ports.add(port) return all_ports def absolute_to_relative_normalized_path(abs_path): relative_path = os.path.relpath(abs_path, os.getcwd()) path_parts = relative_path.split(os.sep) # Split path into components for i, part in enumerate(path_parts): if part.startswith("migration_"): path_parts[i] = "latest" return os.path.join(*path_parts) def do_encrypt_ebs_volumes(): try: ec2.enable_ebs_volume_encryption() except Exception as e: io.log_error(f"Failed to enable EBS volume encryption: {e}") raise e def establish_instance_profile(instance_profile): instance_profile = instance_profile or commonops.create_default_instance_profile() fileoperations.write_config_setting("global", "instance_profile", instance_profile) return instance_profile def generate_upload_target_archive(upload_target_dir, env_name, region): fileoperations.zip_up_folder(upload_target_dir, upload_target_zip_path()) relative_normalized_upload_target_dir_path = absolute_to_relative_normalized_path( upload_target_dir ) try: test_environment_exists(env_name) io.echo( f"\nGenerated destination archive ZIP at .\\{relative_normalized_upload_target_dir_path}.zip. " "You can now upload the zip using:\n\n" f" eb deploy {env_name} --archive .\\migrations\\latest\\upload_target.zip --region {region}\n" ) except NotFoundError: io.echo( f"\nGenerated destination archive directory at .\\{relative_normalized_upload_target_dir_path}.zip. " "You can create an environment with the zip using:\n\n" f" eb migrate --environment-name {env_name} --archive .\\migrations\\latest\\upload_target.zip --region {region}\n" ) def test_environment_exists(env_name): elasticbeanstalk.get_environment(env_name=env_name) def upload_target_zip_path(): return os.path.join(os.getcwd(), "migrations", "latest", "upload_target.zip") def add_virtual_directory_custom_script_to_manifest(upload_target_dir): manifest_file_path = os.path.join( upload_target_dir, "aws-windows-deployment-manifest.json" ) if os.path.exists(manifest_file_path): with open(manifest_file_path) as file: manifest_contents = json.load(file) else: manifest_contents = { "manifestVersion": 1, "deployments": {"msDeploy": [], "custom": []}, } manifest_contents["deployments"]["custom"].append( create_custom_manifest_section( "FixVirtualDirPermissions", "add_virtual_dir_read_access.ps1", "noop.ps1", "noop.ps1", ) ) with open(manifest_file_path, "w") as file: json.dump(manifest_contents, file, indent=4) def process_keyname(keyname): if keyname: commonops.upload_keypair_if_needed(keyname) LOG.debug("Writing default_ec2_keyname to .elasticbeanstalk/config") fileoperations.write_config_setting("global", "default_ec2_keyname", keyname) def establish_platform(platform, interactive): if not platform and interactive: platform = platformops.prompt_for_platform() elif not platform: io.echo("Determining EB platform based on host machine properties") platform = _determine_platform(platform_string=get_windows_server_version()) else: io.echo(f"Determining EB platform based on input, {platform}") platform = _determine_platform(platform) LOG.debug("Writing platform_name to .elasticbeanstalk/config") fileoperations.write_config_setting("global", "platform_name", platform.name) return platform def establish_env_name(env_name, app_name, interactive, sites): if not env_name and interactive: env_name = get_environment_name(app_name) elif not env_name: LOG.debug("Setting env_name to site_name with whitespaces removed") if len(sites) == 1: candidate_env_name = sites[0].Name.replace(" ", "") else: candidate_env_name = "EBMigratedEnv" env_name = get_unique_environment_name(candidate_env_name) return env_name def establish_app_name(app_name, interactive, sites): if not app_name and interactive: app_name = _get_application_name_interactive() elif not app_name: LOG.debug("Setting app_name to site_name with whitespaces removed") if len(sites) == 1: app_name = sites[0].Name.replace(" ", "") else: app_name = "EBMigratedApp" LOG.debug("Writing application_name to .elasticbeanstalk/config") fileoperations.write_config_setting("global", "application_name", app_name) return app_name def generate_snapshots_string(ebs_snapshots): snapshots_string = [] if ebs_snapshots: char_iter = iter(string.ascii_lowercase) io.echo(f"Using input EBS snapshot configuration: {snapshots_string}") snapshots_string = ",".join( [f"/dev/sd{next(char_iter)}={snapshot}" for snapshot in ebs_snapshots] ) return snapshots_string def establish_region(region, interactive, app_name, platform): if not region and interactive: region = commonops.get_region(None, True) elif not region: region = commonops.get_region_force_non_interactive(platform) aws.set_region(region) fileoperations.create_config_file( app_name=app_name, region=region, solution_stack=platform, workspace_type="Application", ) return region def establish_candidate_sites( site_names: Optional[str], interactive: bool ) -> List["Site"]: """ Determine which IIS sites to include in the migration process. Resolves the list of IIS sites to migrate based on input parameters and available sites. Sites can be specified explicitly, chosen interactively, or determined automatically based on the presence of Default Web Site. Args: site_names: Comma-separated string of site names to migrate. If None, uses interactive or default behavior. interactive: If True and site_names is None, prompts user to select a site from available options. Returns: List of IIS Site objects to be migrated Selection Logic: 1. If site_names provided: - Validates all specified sites exist - Returns corresponding Site objects 2. If interactive and no site_names: - Prompts user to select one site - Returns list with selected site 3. If non-interactive and no site_names: - If Default Web Site exists, returns all sites - Otherwise, raises error Raises: ValueError: If specified site name doesn't exist EnvironmentError: If no sites specified in non-interactive mode and Default Web Site doesn't exist Example: >>> # Explicit selection >>> sites = establish_candidate_sites("Site1,Site2", False) >>> # Interactive selection >>> sites = establish_candidate_sites(None, True) >>> # Default behavior >>> sites = establish_candidate_sites(None, False) """ server_manager = ServerManager() if not server_manager.Sites: raise ValueError( "`eb migrate` failed because there are no sites on this IIS server." ) available_sites = [s.Name for s in server_manager.Sites] if site_names: site_names = site_names.split(",") if site_names: for site_name in site_names: if site_name not in available_sites: raise ValueError( f"Specified site, '{site_name}', does not exist. Available sites: [{', '.join(available_sites)}]" ) sites = server_manager.Sites elif not site_names and interactive: io.echo("Select an IIS site to migrate:") site_name = utils.prompt_for_item_in_list( [s.Name for s in server_manager.Sites], default="1" ) site = [s for s in server_manager.Sites if s.Name == site_name][0] sites = [site] else: sites = server_manager.Sites if not sites: raise EnvironmentError( "`eb migrate` failed because there are no sites on this IIS server." ) return sites def list_sites_verbosely(): # TODO: Show URL rewrites and proxy information for i, site in enumerate(ServerManager().Sites, 1): io.echo(f"{i}: {site.Name}:") io.echo(f" - Bindings:") for binding in site.Bindings: io.echo(f" - {binding.BindingInformation}") for application in site.Applications: io.echo(f" - Application '{application.Path}':") io.echo(f" - Application Pool: {application.ApplicationPoolName}") io.echo(f" - Enabled Protocols: {application.EnabledProtocols}") io.echo(f" - Virtual Directories:") virdirs = application.VirtualDirectories for vdir in virdirs: io.echo(f" - {vdir.Path}:") io.echo(f" - Physical Path: {vdir.PhysicalPath}") io.echo(f" - Logon Method: {vdir.LogonMethod}") if vdir.UserName: io.echo(f" - Username: {vdir.UserName}") if vdir.Password: io.echo(" - Password: <redacted>") try: users = get_local_users() except: return io.echo("----------------------------------------------------") io.echo("Users:") for username, homedir in users: io.echo(f" - {username}") io.echo(f" - Home: {homedir}") def get_local_users(): ctx = PrincipalContext(ContextType.Machine) user_principal = UserPrincipal(ctx) searcher = PrincipalSearcher(user_principal) users = searcher.FindAll() user_list = [ (user.SamAccountName, user.HomeDirectory) for user in users if user.Enabled ] return user_list def load_environment_vpc_from_vpc_config(vpc_config: str) -> Dict[str, any]: """ Load and validate VPC configuration from either a JSON file or JSON string. Parses VPC configuration from either a .json file or a JSON-formatted string, validates required fields, and provides defaults for optional parameters. Args: vpc_config: Either: - Path to a JSON file (must end in .json) - JSON-formatted string containing VPC configuration Returns: Dictionary containing VPC configuration with keys: - id: (required) VPC ID - publicip: (optional) Whether to assign public IPs, default True - elbscheme: (optional) ELB scheme, default "public" - ec2subnets: (optional) List of EC2 subnet IDs, default [] - securitygroups: (optional) Comma-separated security group IDs, default "" - elbsubnets: (optional) List of ELB subnet IDs, default [] Raises: FileNotFoundError: If vpc_config is a file path and: - File doesn't exist - File can't be opened - File is a directory - Permission denied ValueError: If: - JSON parsing fails - Required 'id' field is missing - Invalid JSON format in file or string Example JSON Format: { "id": "vpc-1234567890abcdef0", "publicip": true, "elbscheme": "public", "ec2subnets": ["subnet-123", "subnet-456"], "securitygroups": ["sg-123", "sg-456"], "elbsubnets": ["subnet-789", "subnet-abc"] } """ if vpc_config.endswith(".json"): try: with open(os.path.join(vpc_config)) as file: vpc_config_dict = json.load(file) except FileNotFoundError | PermissionError | IsADirectoryError | OSError: raise FileNotFoundError( f"Cannot open file {vpc_config} to parse VPC config. Verify that it exists and contains valid JSON." ) except json.JSONDecodeError: raise ValueError( f"Cannot parse {vpc_config}. Verify that it is a valid JSON file." ) else: try: vpc_config_dict = json.loads(vpc_config) except json.JSONDecodeError: raise ValueError( f"Cannot parse VPC config: {vpc_config}. Verify that it is a valid JSON string." ) try: vpc_config_dict["id"] except KeyError: raise ValueError(f"Must specify a VPC ID in VPC config file '{vpc_config}'") return { "id": vpc_config_dict["id"], "publicip": vpc_config_dict.get("publicip", True), "elbscheme": vpc_config_dict.get("elbscheme", "public"), "ec2subnets": vpc_config_dict.get("ec2subnets", []), "securitygroups": "".join(vpc_config_dict.get("securitygroups", [])), "elbsubnets": vpc_config_dict.get("elbsubnets", []), } def construct_environment_vpc_config( on_prem_mode: bool, verbose: bool ) -> Tuple[Dict[str, str], Optional[str], Optional[str], List[Dict[str, str]]]: """ Detect and construct VPC configuration from current EC2 instance or handle on-premises scenario. Attempts to gather VPC configuration from the current EC2 instance, including subnets, security groups, and tags. Falls back to empty configuration if running on-premises or if EC2 detection fails. Args: on_prem_mode: If True, skip EC2 detection and return empty configuration verbose: If True, print detailed VPC configuration information Returns: Tuple containing: - Dict[str, str]: VPC configuration with keys: * id: VPC ID * publicip: Always 'true' * elbscheme: Always 'public' * ec2subnets: Comma-separated list of first 3 subnet IDs * securitygroups: Comma-separated list of security group IDs * elbsubnets: Same as ec2subnets - Optional[str]: AWS region of instance, or None if not on EC2 - Optional[str]: Instance ID, or None if not on EC2 - List[Dict[str, str]]: Instance tags, excluding AWS system tags Notes: - Uses interleaved AZ subnet selection for high availability - Only includes first 3 subnets for EC2 and ELB - Filters out system tags (elasticbeanstalk:*, aws:*, Name) - Returns empty VPC config if: * on_prem_mode is True * Not running on EC2 * EC2 metadata access fails Example Output (verbose=True): Identifying VPC configuration of this EC2 instance (i-1234567890abcdef0): id: vpc-1234567890abcdef0 publicip: true elbscheme: public ec2subnets: subnet-123,subnet-456,subnet-789 securitygroups: sg-123,sg-456 elbsubnets: subnet-123,subnet-456,subnet-789 """ environment_vpc = dict() region = None tags = [] instance_id = None current_instance_details = ec2.get_current_instance_details() try: if on_prem_mode: raise NotAnEC2Instance("Pretend this is an on-prem instance") instance_id = current_instance_details["InstanceId"] _vpc = current_instance_details["VpcId"] security_groups = current_instance_details["SecurityGroupIds"] subnets = ",".join(ec2.list_subnets_azs_interleaved(_vpc)[:3]) region = current_instance_details["Region"] environment_vpc = { "id": _vpc, "publicip": "true", "elbscheme": "public", "ec2subnets": subnets, "publicip": "true", "securitygroups": ",".join(security_groups), "elbsubnets": subnets, } io.echo(f"Identifying VPC configuration of this EC2 instance ({instance_id}):") if verbose: for key, value in environment_vpc.items(): io.echo(f" {key}: {value}") tags = [ tag for tag in current_instance_details["Tags"] if not ( tag["Key"].startswith("elasticbeanstalk:") or tag["Key"].startswith("aws:") or tag["Key"] == "Name" ) ] except NotAnEC2Instance: raise except Exception: io.echo( f"Unable to detect EC2 configuration. Possibly executing on a non-EC2 instance" ) pass return environment_vpc, region, instance_id, tags def _determine_platform(platform_string=None): if not platform_string: platform_string = platformops.get_configured_default_platform() if platform_string: platform = platformops.get_platform_for_platform_string(platform_string) else: raise ValueError( f"Couldn't identify a platform based on hint: {platform_string}" ) if isinstance(platform, PlatformVersion): platform.hydrate(elasticbeanstalk.describe_platform_version) statusops.alert_platform_status(platform) return platform def setup_migrations_dir(verbose: bool) -> str: """ Create and configure a timestamped migration directory structure. Creates a migrations directory with a timestamped subdirectory for the current migration run, and sets up a 'latest' symlink pointing to it. The directory structure is used to store migration artifacts, logs, and deployment files. Args: verbose: If True, prints detailed information about log file locations and directory purposes Returns: str: Absolute path to the newly created migration directory Directory Structure: migrations/ ├── latest -> migration_[timestamp]/ (symlink) └── migration_[timestamp]/ ├── application.log (msbuild.exe logs) ├── error.log (msbuild.exe errors) └── upload_target/ (deployment artifacts) Notes: - Creates 'migrations' directory in current working directory if it doesn't exist - Generates unique directory name using UTC timestamp - Updates 'latest' symlink to point to new directory - Preserves original working directory - Creates directories with exist_ok=True to handle race conditions Example Output: Using .\\migrations\\migration_1708445678.123456 to contain artifacts for this migration run. If verbose: .\\migrations\\migration_1708445678.123456\\application.log -> msbuild.exe application logs .\\migrations\\migration_1708445678.123456\\error.log -> msbuild.exe error logs .\\migrations\\migration_1708445678.123456\\upload_target\\ -> destination archive dir """ migrations_dir = "migrations" migrations_dir_path = os.path.join(os.getcwd(), migrations_dir) cwd = os.getcwd() os.makedirs(migrations_dir_path, exist_ok=True) os.chdir(migrations_dir_path) latest_migration_dir_name = "migration_" + str( datetime.datetime.utcnow().timestamp() ) os.makedirs(latest_migration_dir_name) if os.path.exists("latest"): os.unlink("latest") os.symlink(latest_migration_dir_name, "latest", target_is_directory=True) os.chdir(cwd) latest_migration_run_path = os.path.join( migrations_dir_path, latest_migration_dir_name ) relative_path = os.path.relpath(latest_migration_run_path, os.curdir) relative_normalized_path = absolute_to_relative_normalized_path(relative_path) io.echo( f"Using .\\{relative_normalized_path} to contain artifacts for this migration run." ) if verbose: io.echo( f" .\\{relative_normalized_path}\\application.log -> msbuild.exe application logs" ) io.echo(f" .\\{relative_normalized_path}\\error.log -> msbuild.exe error logs") io.echo( f" .\\{relative_normalized_path}\\upload_target\\ -> destination archive directory" ) return latest_migration_run_path def get_environment_name(app_name): return io.prompt_for_environment_name(get_unique_environment_name(app_name)) def get_unique_environment_name(app_name): default_name = app_name + "-dev" current_environments = elasticbeanstalk.get_all_environment_names() return utils.get_unique_name(default_name, current_environments) def _get_application_name_interactive(): app_list = elasticbeanstalk.get_application_names() file_name = fileoperations.get_current_directory_name() new_app = False if len(app_list) > 0: io.echo() io.echo("Select an application to use") new_app_option = "[ Create new Application ]" app_list.append(new_app_option) try: default_option = app_list.index(file_name) + 1 except ValueError: default_option = len(app_list) app_name = utils.prompt_for_item_in_list(app_list, default=default_option) if app_name == new_app_option: new_app = True if len(app_list) == 0 or new_app: io.echo() io.echo("Enter Application Name") unique_name = utils.get_unique_name(file_name, app_list) app_name = io.prompt_for_unique_name(unique_name, app_list) return app_name def get_registry_value(key, subkey, value): try: key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, subkey) return winreg.QueryValueEx(key, value)[0] except WindowsError: return None, None def get_windows_product_name(): product_name = get_registry_value( winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows NT\CurrentVersion", "ProductName", ) return product_name def get_windows_server_version(): product_name = get_windows_product_name() return product_name.replace(" Datacenter", "") def get_unique_cname(env_name): """ Derive a unique CNAME for a new environment based on the environment name :param env_name: name of the environment directory :return: A unique CNAME for a new environment """ cname = env_name tried_cnames = [] while not elasticbeanstalk.is_cname_available(cname): tried_cnames.append(cname) utils.sleep(0.5) cname = utils.get_unique_name(cname, tried_cnames) return cname def get_iis_version_from_registry() -> str: """ Retrieve the IIS version from the Windows registry. Returns: str: The IIS version number. Raises: OSError: If unable to access the registry or retrieve the IIS version. """ try: with winreg.OpenKey( winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\InetStp" ) as key: version = winreg.QueryValueEx(key, "VersionString")[0] return version.split()[-1] except (OSError, IndexError) as e: raise OSError( "Unable to retrieve IIS version from Windows registry. " "Please ensure that IIS (version 7.0 or later) is installed " "and that you have sufficient permissions to access the registry." ) from e def validate_iis_version_greater_than_7_0() -> None: """ Validate that the installed IIS version is 7.0 or later. Raises: EnvironmentError: If the IIS version is less than 7.0 or cannot be determined. """ try: iis_version = float(get_iis_version_from_registry()) if iis_version < 7.0: raise EnvironmentError( f"IIS version {iis_version} is not supported. " "Please upgrade to IIS version 7.0 or later." ) except ValueError as e: raise EnvironmentError( "Unable to determine IIS version. " "Please ensure that IIS (version 7.0 or later) is properly installed." ) from e def get_all_assemblies(root_assembly): visited = HashSet[str]() queue = Queue[Assembly]() queue.Enqueue(root_assembly) assemblies = [] while queue.Count > 0: current_asm = queue.Dequeue() if not visited.Contains(current_asm.FullName): visited.Add(current_asm.FullName) assemblies.append(current_asm) # Enumerate references of the current assembly for ref_name in current_asm.GetReferencedAssemblies(): try: ref_asm = Assembly.Load(ref_name) if ref_asm is not None and not visited.Contains(ref_asm.FullName): queue.Enqueue(ref_asm) except Exception as e: # Handle cases where an assembly fails to load LOG.debug( f"Could not load referenced assembly {ref_name.Name}: {e}" ) io.log_warning(f"Could not load {ref_name.Name}.") return assemblies def copy_assemblies_into_bin(bin_path: str, site_name: str) -> None: root_assembly = Assembly.LoadFrom(os.path.join(bin_path, f"{site_name}.dll")) LOG.debug(f"Transitive dependencies: ") for asm in get_all_assemblies(root_assembly): if asm.Location == os.path.join(bin_path, f"{site_name}.dll"): continue LOG.debug(f"Copying {asm.FullName}: {asm.Location} into {bin_path}") def hsts_disablement_arg(site): try: if not site.HSTS.Enabled: return f'-skip:objectName=hsts,absolutePath="{site.Name}"' except AttributeError as e: if not "'Site' object has no attribute 'HSTS'" in str(e): raise e return "" def ms_deploy_sync_application( site: "Site", application: "Application", destination: str, upload_target_dir: str, manifest_contents: Dict[str, Any], ) -> None: """ Synchronize an IIS application to deployment artifacts and update the manifest. Creates deployment artifacts and manifest entries for an IIS application, handling different deployment scenarios based on whether the application belongs to the Default Web Site or a custom site. Args: site: IIS Site object containing the application application: IIS Application object to be synchronized destination: Directory for deployment artifacts and logs upload_target_dir: Target directory for generated files manifest_contents: Elastic Beanstalk deployment manifest to update Manifest Handling: - Default Web Site applications: * Added to 'msDeploy' section * Uses Web Deploy for deployment * May include port reassignment if not on port 80 - Custom site applications: * Added to 'custom' section * Includes installation, restart, and removal scripts * May include ARR configuration if needed Special Cases: 1. ARR Configuration: - Generates ARR configuration scripts if proxy is enabled - Adds Windows Proxy Feature enablement scripts 2. Port Reassignment: - Handles Default Web Site running on non-standard ports 3. IIS Start Page: - Adds configuration for iisstart.htm if present Generated Files: - Application bundle (.zip) - PowerShell scripts for site management - ARR configuration scripts (if needed) - Port reassignment scripts (if needed) - Default document configuration (if needed) Notes: - File paths in manifest are normalized and made relative to CWD - Site names are normalized (spaces removed) for file naming - Manifest sections are added only if they don't already exist - Uses 'noop.ps1' for optional restart/uninstall operations """ _normalized_application_name = normalized_application_name(site, application) destination_archive_path = os.path.join( upload_target_dir, _normalized_application_name ) relative_path = os.path.relpath(upload_target_dir, os.curdir) relative_normalized_path = absolute_to_relative_normalized_path(relative_path) io.echo( f" {site.Name}{application.Path} -> .\\{relative_normalized_path}\\{_normalized_application_name}.zip" ) _iis_application_name_value = iis_application_name_value(site, application) ms_deploy_args_str = construct_ms_deploy_command_for_application( site, application, _iis_application_name_value, destination_archive_path ) LOG.debug(" Executing the following script to create destination application:") LOG.debug(f"\n {ms_deploy_args_str}\n") do_ms_deploy_sync_application( ms_deploy_args_str, destination, destination_archive_path, upload_target_dir, _normalized_application_name, ) manifest_section_name = _iis_application_name_value.strip("/") application_pool_name = application.ApplicationPoolName virts = [virt for virt in application.VirtualDirectories if virt.Path == "/"] if not virts: return physical_path = virts[0].PhysicalPath contains_iistart_htm = os.path.exists(os.path.join(physical_path, "iisstart.htm")) if site.Name != "Default Web Site": installation_script_name = f"install_site_{site.Name.replace(' ', '')}.ps1" removal_script_name = f"remove_site_{site.Name.replace(' ', '')}.ps1" restart_script_name = f"restart_site_{site.Name.replace(' ', '')}.ps1" write_custom_site_installer_script( upload_target_dir, site.Name, site.Bindings, physical_path, installation_script_name, ) write_custom_site_removal_script( upload_target_dir, site.Name, removal_script_name ) write_custom_site_restarter_script( upload_target_dir, site.Name, restart_script_name ) manifest_section = create_custom_manifest_section( manifest_section_name, installation_script_name, restart_script_name, removal_script_name, f"Custom script to install {site.Name}", ) manifest_contents["deployments"]["custom"].append(manifest_section) if _arr_enabled(): write_windows_proxy_feature_enabler_script(upload_target_dir) manifest_section = create_custom_manifest_section( "WindowsProxyFeatureEnabler", "windows_proxy_feature_enabler.ps1", "noop.ps1", "noop.ps1", f"Custom script to execute Install-WindowsFeature Web Proxy", ) add_unique_manifest_section( "WindowsProxyFeatureEnabler", manifest_contents, manifest_section ) write_arr_configuration_importer_script(upload_target_dir) manifest_section = create_custom_manifest_section( "ArrConfigurationImporterScript", "arr_configuration_importer_script.ps1", "noop.ps1", "noop.ps1", f"Custom script to enable ARR proxy", ) add_unique_manifest_section( "ArrConfigurationImporterScript", manifest_contents, manifest_section ) # manifest_contents['deployments']['custom'].append(manifest_section) else: manifest_section = { "name": manifest_section_name, "parameters": { "appBundle": f"{_normalized_application_name}.zip", "iisPath": application.Path, "iisWebSite": site.Name, }, } post_install_custom_script_section = None for binding in site.Bindings: host, port, domain = binding.get_BindingInformation().split(":") if port != "80": port_reassignment_script_name = ( "default_web_site_port_reassignment_script.ps1" ) write_default_web_site_port_reassignment_script( upload_target_dir, binding, port_reassignment_script_name ) post_install_custom_script_section = create_custom_manifest_section( "ExecuteDefaultWebSitePortReassignment", port_reassignment_script_name, port_reassignment_script_name, port_reassignment_script_name, f"Perform port-reassignment for {site.Name} away from port 80", ) break manifest_contents["deployments"]["msDeploy"].append(manifest_section) if post_install_custom_script_section: add_unique_manifest_section( "ExecuteDefaultWebSitePortReassignment", manifest_contents, post_install_custom_script_section, ) # TODO: identify all DefaultDocuments for a given site and determine # whether there are any extra ones to account for if contains_iistart_htm: write_reinstate_iisstart_htm_default_document_script(upload_target_dir) manifest_section = create_custom_manifest_section( "ReinstateIISStartHTMDefaultDocumentScript", "reinstate_iisstart_htm_default_document.ps1", "noop.ps1", "noop.ps1", f"Custom script to enable iisstart.htm default document", ) add_unique_manifest_section( "ExecuteDefaultWebSitePortReassignment", manifest_contents, manifest_section ) def add_unique_manifest_section( section_name: str, manifest_contents: Dict[str, Any], section_contents: Dict[str, Any], ) -> None: """ Add a section to the deployment manifest only if it doesn't already exist. Checks the custom deployments section of an Elastic Beanstalk manifest for a section with the specified name, and adds the new section only if no section with that name exists. Args: section_name: Name of the section to check for and potentially add manifest_contents: Complete manifest dictionary containing the deployments structure with a 'custom' list section_contents: New section configuration to add if section_name doesn't exist Notes: - Operates on manifest_contents in-place - Only checks sections under ['deployments']['custom'] - Preserves existing section if name match is found - Appends new section if no matching name is found Example: >>> manifest = { ... 'deployments': { ... 'custom': [ ... {'name': 'existing_section', 'scripts': {...}} ... ] ... } ... } >>> new_section = {'name': 'new_section', 'scripts': {...}} >>> add_unique_manifest_section('new_section', manifest, new_section) """ found = False for manifest_section in manifest_contents["deployments"]["custom"]: if manifest_section["name"] == section_name: found = True break if not found: manifest_contents["deployments"]["custom"].append(section_contents) def cleanup_previous_migration_artifacts(force: bool, verbose: bool) -> None: """ Clean up old migration directories while preserving the 'latest' migration. Removes previous migration artifacts from the 'migrations' directory, keeping only the 'latest' symbolic link and its target. Can operate in interactive or force mode. Args: force: If True, skip confirmation prompt and delete automatically. If False, prompt user for confirmation. verbose: If True, print detailed information about deleted directories. If False, log deletions at debug level only. Directory Structure: migrations/ ├── latest -> timestamp_directory/ (symlink preserved) ├── timestamp_directory/ (preserved if latest) ├── old_timestamp_1/ (deleted) └── old_timestamp_2/ (deleted) Notes: - Only operates if 'migrations' directory exists in current working directory - Preserves 'latest' symlink and its target directory - Prompts for confirmation unless force=True - Logs or prints deletion operations based on verbose setting - Handles both relative and absolute paths safely Example: >>> cleanup_previous_migration_artifacts(force=True, verbose=True) # Output: # Deleting older migration artifacts. # - Deleting directory: migrations/20240219_120000 # - Deleting directory: migrations/20240218_120000 """ migration_dir_name = "migrations" if not os.path.exists(migration_dir_name): io.echo("There is no directory called 'migrations' in PWD. Nothing to do.") return should_delete = force or io.get_boolean_response( text=prompts["migrate.should_cleanup"], default=False ) if not should_delete: return latest_path = os.path.realpath(os.path.join(migration_dir_name, "latest")) io.echo("Deleting older migration artifacts.") for item in os.listdir(migration_dir_name): if item == "latest": continue item_path = os.path.abspath(os.path.join("migrations", item)) if os.path.isdir(item_path) and item_path != latest_path: if verbose: io.echo(f" - Deleting directory: {item_path}") else: LOG.debug(f" - Deleting directory: {item_path}") shutil.rmtree(item_path) def do_ms_deploy_sync_application( ms_deploy_args_str: str, destination: str, destination_archive_path: str, upload_target_dir: str, _normalized_application_name: str, ) -> None: """ Execute Web Deploy synchronization for an IIS application and process the results. Runs msdeploy.exe with specified arguments to export an IIS application, captures output and errors, and packages the result into a ZIP file if successful. Args: ms_deploy_args_str: Complete Web Deploy command arguments string destination: Directory for log files destination_archive_path: Temporary directory for Web Deploy output upload_target_dir: Target directory for final ZIP file _normalized_application_name: Sanitized application name for ZIP file Raises: RuntimeError: If Web Deploy process exits with non-zero status Process Flow: 1. Locates msdeploy.exe 2. Executes Web Deploy with specified arguments 3. Captures stdout/stderr to log files 4. On success: - Creates ZIP from destination_archive_path - Cleans up temporary directory 5. On failure: - Logs error - Raises exception with exit code Notes: - Creates/appends to 'application.log' and 'error.log' in destination directory - Final ZIP file will be named '{_normalized_application_name}.zip' - Cleans up temporary archive directory after successful ZIP creation """ ms_deploy_exe, use_64bit = get_webdeployv3path() start_info = ProcessStartInfo() start_info.FileName = ms_deploy_exe start_info.Arguments = ms_deploy_args_str start_info.RedirectStandardOutput = True start_info.RedirectStandardError = True start_info.UseShellExecute = False start_info.CreateNoWindow = True # Start the process process = Process() process.StartInfo = start_info process.Start() # Redirect standard output and error to files with open(os.path.join(destination, "application.log"), "a") as stdout_file: stdout_file.write(process.StandardOutput.ReadToEnd()) with open(os.path.join(destination, "error.log"), "a") as stderr_file: stderr_file.write(process.StandardError.ReadToEnd()) process.WaitForExit() if process.ExitCode == 0: fileoperations.zip_up_folder( destination_archive_path, os.path.join(upload_target_dir, f"{_normalized_application_name}.zip"), ) shutil.rmtree(destination_archive_path) else: io.log_error(f"MSDeploy process exited with code {process.ExitCode}.") raise RuntimeError( f"MSDeploy process exited with code {process.ExitCode}. You can find execution logs at .\\migrations\\latest\\error.log" ) def construct_ms_deploy_command_for_application( site: "Site", application: "Application", _iis_application_name_value: str, destination_archive_path: str, ) -> str: """ Construct Web Deploy (MSDeploy) command for exporting an IIS application. Builds a command string for Web Deploy to synchronize an IIS application configuration to an archive directory, including application pool settings and security configurations. Args: site: IIS Site object containing the application application: IIS Application object to be exported _iis_application_name_value: Full application path in IIS format (e.g., "Default Web Site/MyApp") destination_archive_path: Path where the application archive will be created Returns: str: Complete MSDeploy command string with all necessary parameters and settings for application export Command Components: - Sync verb for export operation - Source from IIS application host configuration - Destination archive directory - Application pool configuration and parameterization - IIS application name parameterization - Security content handling - HSTS settings handling Example: >>> cmd = construct_ms_deploy_command_for_application( ... site, ... app, ... "Default Web Site/MyApp", ... "C:\\export\\myapp" ... ) >>> # Returns: "-verb:sync -source:apphostconfig=... -dest:archiveDir=..." """ ms_deploy_verb = "-verb:sync" ms_deploy_source = f'-source:apphostconfig="{_iis_application_name_value}"' ms_deploy_dest = f"-dest:archiveDir='{destination_archive_path}'" ms_deploy_enable_app_pool_ext = "-enableLink:AppPoolExtension" application_pool_name = application.ApplicationPoolName ms_deploy_app_pool = ( "-declareParam:name='Application Pool'," f"defaultValue='{application_pool_name}'," f"description='Application pool for application {application.Path}'," "kind=DeploymentObjectAttribute," "scope=appHostConfig," "match='application/@applicationPool'" ) iis_website_application_name_arg = ( "-declareParam:name='IIS Web Application Name'," f"defaultValue='{_iis_application_name_value.strip('/')}'" ) copy_secure_content_arg = "-enableRule:CopySecureContent" ms_deploy_args = [ ms_deploy_verb, ms_deploy_source, ms_deploy_dest, ms_deploy_enable_app_pool_ext, ms_deploy_app_pool, iis_website_application_name_arg, hsts_disablement_arg(site), copy_secure_content_arg, ] return " ".join(ms_deploy_args) def normalized_application_name(site: "Site", application: "Application") -> str: if application.Path == "/": return site.Name.replace(" ", "") return f"{site.Name}-{application.Path.strip('/')}" def get_webdeployv3path() -> Tuple[str, bool]: """ Locate the Web Deploy V3 (msdeploy.exe) installation path from Windows registry. Searches the Windows registry for Web Deploy V3 installation, checking both 64-bit and 32-bit registry views. Returns the full path to msdeploy.exe and indicates whether it was found in the 64-bit registry. Returns: tuple: (path_to_msdeploy, is_64bit) where: - path_to_msdeploy (str): Full path to msdeploy.exe - is_64bit (bool): True if found in 64-bit registry, False if in 32-bit Raises: RuntimeError: If Web Deploy V3 installation cannot be found in either registry view, with instructions for installation Notes: - Checks registry key: SOFTWARE\\Microsoft\\IIS Extensions\\MSDeploy\\3 - Tries 64-bit registry first, falls back to 32-bit - Returns InstallPath value plus "msdeploy.exe" Example: >>> path, is_64bit = get_webdeployv3path() >>> # Typical return value: >>> # ("C:\\Program Files\\IIS\\Microsoft Web Deploy V3\\msdeploy.exe", True) """ webdeployv3_key = r"SOFTWARE\Microsoft\IIS Extensions\MSDeploy\3" use_64bit = True # Try 64-bit registry first try: key = winreg.OpenKey( winreg.HKEY_LOCAL_MACHINE, webdeployv3_key, 0, winreg.KEY_READ | winreg.KEY_WOW64_64KEY, ) except WindowsError: use_64bit = False # If 64-bit fails, try 32-bit try: key = winreg.OpenKey( winreg.HKEY_LOCAL_MACHINE, webdeployv3_key, 0, winreg.KEY_READ | winreg.KEY_WOW64_32KEY, ) except WindowsError: error = "Couldn't find msdeploy.exe. Follow instructions here: https://learn.microsoft.com/en-us/iis/install/installing-publishing-technologies/installing-and-configuring-web-deploy" raise RuntimeError(error) install_path = winreg.QueryValueEx(key, "InstallPath")[0] winreg.CloseKey(key) return install_path + "msdeploy.exe", use_64bit def warn_about_password_protection(site, application): _iis_application_name_value = iis_application_name_value(site, application) try: for vdir in application.VirtualDirectories: if vdir.Password: io.log_warning( f"Cannot copy virtual directory associated with site because it is password protected: Site [{site.Name}/{_iis_application_name_value}], Path hosting:[{vdir.PhysicalPath}]" ) except AttributeError: pass def iis_application_name_value(site: "Site", application: "Application"): if application.Path == "/": return site.Name + "/" else: return f"{site.Name}\\{application.Path.strip('/')}" def create_noop_ps1_script(upload_target_dir): script_path = os.path.join(os.path.dirname(__file__), "migrate_scripts", "noop.ps1") with open(script_path, "r") as source_file: script = source_file.read() with open( os.path.join(upload_target_dir, "ebmigrateScripts", "noop.ps1"), "w" ) as file: file.write(script) def create_virtualdir_path_permission_script(physical_paths, upload_target_dir): script_path = os.path.join( os.path.dirname(__file__), "migrate_scripts", "add_virtual_dir_read_access.ps1" ) with open(script_path, "r") as source_file: script_template = source_file.read() # Create the physical paths array string physical_paths_array_string = ",\n ".join([f'"{p}"' for p in set(physical_paths)]) # Replace the placeholder in the template script = script_template.replace( "# This will be populated dynamically with physical paths", physical_paths_array_string, ) with open( os.path.join( upload_target_dir, "ebmigrateScripts", "add_virtual_dir_read_access.ps1" ), "w", ) as file: file.write(script) def _arr_enabled() -> bool: """ Check if Application Request Routing (ARR) is enabled in IIS configuration. Examines the applicationHost.config file to determine if ARR is both installed and enabled by checking the system.webServer/proxy section's 'enabled' attribute. Returns: bool: True if ARR is installed and enabled in IIS configuration, False if ARR is not installed or is disabled Notes: - Checks applicationHost.config via ServerManager COM interface - Returns False if proxy section doesn't exist (ARR not installed) - Logs debug message if proxy section is missing - Propagates unexpected exceptions Raises: Exception: Any unexpected errors during configuration access, except for missing configuration section """ server_manager = ServerManager() try: proxy_config_section = ( server_manager.GetApplicationHostConfiguration().GetSection( "system.webServer/proxy" ) ) if proxy_config_section is None: return False return proxy_config_section.GetAttributeValue("enabled") except COMException: LOG.debug( "ConfigurationSection system.webServer/proxy does not exist in applicationHost.config." ) pass except Exception as e: raise e def export_arr_config(upload_target_dir: str, verbose: bool) -> None: """ Export IIS Application Request Routing (ARR) configuration to XML files. Exports modified (non-default) settings from ARR-related IIS configuration sections to XML files. These files can be used to replicate ARR configuration on other servers during Elastic Beanstalk deployments. Args: upload_target_dir: Base directory for deployment artifacts. Configuration files will be written to '{upload_target_dir}/ebmigrateScripts/' verbose: If True, provides detailed output about each configuration section and export operation Configuration Sections Exported: - system.webServer/proxy: ARR proxy settings - system.webServer/rewrite: URL rewrite rules - system.webServer/caching: Caching configuration Notes: - Only exports attributes that differ from default values - Creates one XML file per configuration section: * arr_config_proxy.xml * arr_config_rewrite.xml * arr_config_caching.xml - Skips sections that don't exist in the current configuration - Automatically generates ARR import script after export Raises: Exception: If export fails, with detailed error message Example XML Output: <proxy enabled="true" timeout="00:02:00" /> """ config_sections = [ "system.webServer/proxy", "system.webServer/rewrite", "system.webServer/caching", ] if not _arr_enabled() and verbose: io.echo("No Automatic Request Routing (ARR) configuration found.") return else: io.echo("Automatic Request Routing (ARR) configuration found.") server_manager = ServerManager() try: for i, section in enumerate(config_sections, 1): section_name = section.split("/")[-1] arr_config_file = f"arr_config_{section_name}.xml" arr_config_file_path = os.path.join( upload_target_dir, "ebmigrateScripts", arr_config_file ) with open(arr_config_file_path, "w") as file: config = server_manager.GetApplicationHostConfiguration() try: section_obj = config.GetSection(section) except COMException: if verbose: io.echo(f" {i}. Section {section} not found") continue modified_attributes = [ attr for attr in section_obj.Attributes if not attr.IsInheritedFromDefaultValue ] # TODO: Handle child attributes for system.webserver/caching as well xml_content = f"<{section_name}" for attr in modified_attributes: xml_content += f' {attr.Name}="{attr.Value}"' xml_content += " />" file.write(xml_content) if verbose: io.echo( f" {i}. Modified {section_name} configuration exported to {arr_config_file_path}" ) if not verbose: io.echo("Exported ARR config.") except Exception as e: io.log_error(f"Failed to export ARR configuration: {str(e)}") raise write_arr_import_script_to_source_bundle(upload_target_dir) def write_arr_import_script_to_source_bundle(upload_target_dir: str) -> None: """ Generate a PowerShell script for downloading and installing IIS ARR component. Creates a script that handles the download and installation of Application Request Routing (ARR) for IIS. The script includes verification of existing installation and error handling for download/installation failures. Args: upload_target_dir: Base directory for deployment artifacts. The script will be written to '{upload_target_dir}/ebmigrateScripts/arr_msi_installer.ps1' Notes: - Generated script downloads from Microsoft's official URL - Creates installers directory at C:\\installers\\arr-install - Reports issues to aws-elastic-beanstalk-cli GitHub repository """ script_path = os.path.join( os.path.dirname(__file__), "migrate_scripts", "arr_msi_installer.ps1" ) with open(script_path, "r") as source_file: script = source_file.read() with open( os.path.join(upload_target_dir, "ebmigrateScripts", "arr_msi_installer.ps1"), "w", ) as file: file.write(script) def write_windows_proxy_feature_enabler_script(upload_target_dir): script_path = os.path.join( os.path.dirname(__file__), "migrate_scripts", "windows_proxy_feature_enabler.ps1", ) with open(script_path, "r") as source_file: script_contents = source_file.read() with open( os.path.join( upload_target_dir, "ebmigrateScripts", "windows_proxy_feature_enabler.ps1" ), "w", ) as file: file.write(script_contents) def write_arr_configuration_importer_script(upload_target_dir: str) -> None: """ Generate a PowerShell script for importing Application Request Routing (ARR) configuration. Creates a script that handles the import of ARR configuration from XML files, including proxy, rewrite, and caching settings. The script includes backup functionality and type-safe configuration import. Args: upload_target_dir: Base directory for deployment artifacts. The script will be written to '{upload_target_dir}/ebmigrateScripts/arr_configuration_importer_script.ps1' Notes: - Generated script requires WebAdministration PowerShell module - Handles three IIS configuration sections: * system.webServer/proxy * system.webServer/rewrite * system.webServer/caching - Expects configuration files in C:\\staging\\ebmigrateScripts\\ """ script_path = os.path.join( os.path.dirname(__file__), "migrate_scripts", "arr_configuration_importer_script.ps1", ) with open(script_path, "r") as source_file: script_contents = source_file.read() with open( os.path.join( upload_target_dir, "ebmigrateScripts", "arr_configuration_importer_script.ps1", ), "w", ) as file: file.write(script_contents) def write_custom_site_installer_script( upload_target_dir: str, site_name: str, bindings: List["Binding"], physical_path: str, installation_script_name: str, ) -> None: """ Generate a PowerShell script for installing and configuring an IIS website. Creates an installation script that will be referenced in the Elastic Beanstalk deployment manifest's custom section. The script handles complete website setup including app pool creation, website configuration, and permissions. If Application Request Routing (ARR) is enabled in IIS, additional ARR configuration is included. Args: upload_target_dir: Base directory for deployment artifacts site_name: Name of the IIS website to create bindings: List of IIS binding objects defining site endpoints physical_path: Physical path where website content will be deployed installation_script_name: Name of the PowerShell script file to generate Generated Script Features: - Creates and configures application pool with .NET 4.0 runtime - Creates website with specified bindings and physical path - Deploys content using Web Deploy (msdeploy.exe) - Sets appropriate file system permissions - Handles ARR configuration if proxy is enabled: * Installs ARR components if needed * Imports ARR configuration from XML files * Configures proxy settings Notes: - Script requires WebAdministration PowerShell module - Uses site_name for both website and app pool names - Expects website content at 'C:\\staging\\{site_name}.zip' - Includes ARR configuration only if proxy is enabled in IIS - Generated script is referenced in EB deployment manifest """ binding_protocol_tuples = [] invoke_arr_import_script_call = "" for binding in bindings: binding_string = binding.BindingInformation # Always add the binding information regardless of ARR status if binding_string and binding_string.strip(): binding_protocol_tuples.append( f'"{binding_string.strip()}" = "{binding.Protocol.lower()}"' ) # Only set ARR import script if ARR is enabled if _arr_enabled(): invoke_arr_import_script_call = "Invoke-ARRImportScript" binding_protocol_powershell_array = "\n".join(binding_protocol_tuples) # Read the template script script_path = os.path.join( os.path.dirname(__file__), "migrate_scripts", "site_installer_template.ps1" ) with open(script_path, "r") as source_file: script_template = source_file.read() # Replace placeholders in the template script_content = ( script_template.replace("{site_name}", site_name) .replace( "{binding_protocol_powershell_array}", binding_protocol_powershell_array ) .replace("{physical_path}", physical_path) .replace("{invoke_arr_import_script_call}", invoke_arr_import_script_call) ) with open( os.path.join(upload_target_dir, "ebmigrateScripts", installation_script_name), "w", ) as file: file.write(script_content) def write_custom_site_removal_script( upload_target_dir: str, site_name: str, uninstallation_script_name: str ) -> None: """ Generate a PowerShell script for removing an IIS website during uninstallation. Creates an uninstallation script that will be referenced in the Elastic Beanstalk deployment manifest's custom section, typically used as an uninstall script in a custom deployment action. Args: upload_target_dir: Base directory for deployment artifacts site_name: Name of the IIS website to remove uninstallation_script_name: Name of the PowerShell script file to generate (must have .ps1 extension) Notes: - Creates script in '{upload_target_dir}/ebmigrateScripts/{uninstallation_script_name}' - Generated script requires WebAdministration PowerShell module - Script includes utility functions from ebdeploy_utils.ps1 """ # Read the template script script_path = os.path.join( os.path.dirname(__file__), "migrate_scripts", "site_removal_template.ps1" ) with open(script_path, "r") as source_file: script_template = source_file.read() # Replace the site_name placeholder script_contents = script_template.replace("{site_name}", site_name) with open( os.path.join(upload_target_dir, "ebmigrateScripts", uninstallation_script_name), "w", ) as file: file.write(script_contents) def write_custom_site_restarter_script( upload_target_dir: str, site_name: str, restarter_script_name: str ) -> None: """ Generate a PowerShell script for restarting an IIS website during deployment. Creates a restart script that will be referenced in the Elastic Beanstalk deployment manifest's custom section, typically used as a restart script in a custom deployment action. Args: upload_target_dir: Base directory for deployment artifacts site_name: Name of the IIS website to restart restarter_script_name: Name of the PowerShell script file to generate (must have .ps1 extension) Notes: - Creates script in '{upload_target_dir}/ebmigrateScripts/{restarter_script_name}' - Generated script requires WebAdministration PowerShell module - Script includes utility functions from ebdeploy_utils.ps1 """ # Read the template script script_path = os.path.join( os.path.dirname(__file__), "migrate_scripts", "site_restart_template.ps1" ) with open(script_path, "r") as source_file: script_template = source_file.read() # Replace the site_name placeholder script_contents = script_template.replace("{site_name}", site_name) with open( os.path.join(upload_target_dir, "ebmigrateScripts", restarter_script_name), "w" ) as file: file.write(script_contents) def write_default_web_site_port_reassignment_script( upload_target_dir: str, binding: "Binding", port_reassignment_script_name: str ) -> None: """ Generate a PowerShell script for IIS Default Web Site port reassignment. Args: upload_target_dir: Base directory for deployment artifacts binding: IIS Binding object containing the target binding configuration port_reassignment_script_name: Name of the PowerShell script file to generate (must have .ps1 extension) Notes: - Creates script in '{upload_target_dir}/ebmigrateScripts/{port_reassignment_script_name}' - Generated script requires WebAdministration PowerShell module - Script includes utility functions from ebdeploy_utils.ps1 """ host, port, domain = binding.BindingInformation.split(":") # Read the template script script_path = os.path.join( os.path.dirname(__file__), "migrate_scripts", "default_web_site_port_reassignment_template.ps1", ) with open(script_path, "r") as source_file: script_template = source_file.read() # Replace placeholders in the template script_content = ( script_template.replace("{host}", host) .replace("{port}", port) .replace("{domain}", domain) ) with open( os.path.join( upload_target_dir, "ebmigrateScripts", port_reassignment_script_name ), "w", ) as file: file.write(script_content) def write_ebdeploy_utility_script(upload_target_dir: str) -> None: """ Generate a PowerShell utility script containing common deployment functions. Creates ebdeploy_utils.ps1, a shared PowerShell module used by other deployment scripts. This utility script provides common functions for logging and ACL management during the Elastic Beanstalk deployment process. Args: upload_target_dir: Base directory for deployment artifacts. The script will be written to '{upload_target_dir}/ebmigrateScripts/ebdeploy_utils.ps1' Notes: - Script is imported by other deployment scripts using dot-sourcing - All logging functions use UTC timestamps for consistency - ACL rules include both container and object inheritance - All access rules are "Allow" type """ script_path = os.path.join( os.path.dirname(__file__), "migrate_scripts", "ebdeploy_utils.ps1" ) with open(script_path, "r") as source_file: script_content = source_file.read() with open( os.path.join(upload_target_dir, "ebmigrateScripts", "ebdeploy_utils.ps1"), "w" ) as file: file.write(script_content) def write_copy_firewall_config_script( upload_target_dir: str, sites: List["Site"] ) -> None: """ Generate and configure deployment of Windows Firewall rules based on IIS site bindings. Creates a PowerShell script that replicates the source environment's firewall configuration on the target environment. The script is added to the Elastic Beanstalk deployment manifest for execution during deployment. The function: 1. Extracts HTTP/HTTPS ports from IIS site bindings 2. Retrieves existing firewall rules for those ports 3. Generates New-NetFirewallRule commands for each rule 4. Creates a deployment script in the ebmigrateScripts directory 5. Adds the script to the deployment manifest's custom section Args: upload_target_dir: Base directory for deployment artifacts. The script will be written to '{upload_target_dir}/ebmigrateScripts/modify_firewall_config.ps1' Notes: - Only processes HTTP and HTTPS site bindings - Generates inbound firewall rules only - Preserves original rule properties: * Display name * Action (Allow/Block) * Protocol * Port specifications * Enabled state - Uses noop.ps1 for restart and uninstall operations - If no relevant firewall rules are found, no script is generated - Modifies aws-windows-deployment-manifest.json to include the script Warning: Current implementation does not handle cleanup of firewall rules when sites are removed. Firewall rules persist after site removal. """ ports = set() for site in sites: for binding in site.Bindings: host, port, domain = binding.BindingInformation.split(":") protocol = binding.Protocol if protocol in ["http", "https"]: ports.add(port.strip()) firewall_rules = get_firewall_rules(ports) powershell_commands = [] for rule in firewall_rules: command = ( f'New-NetFirewallRule -DisplayName "{rule["Name"]}" ' f'-Direction Inbound -Action {rule["Action"]} ' f'-Protocol {rule["Protocol"]} -LocalPort {rule["LocalPorts"]} ' ) if not rule["Enabled"]: command += "-Enabled False" powershell_commands.append(command) if not powershell_commands: return # Read the template script script_path = os.path.join( os.path.dirname(__file__), "migrate_scripts", "modify_firewall_config.ps1" ) with open(script_path, "r") as source_file: script_template = source_file.read() # Replace the placeholder with the actual firewall commands script_content = script_template.replace( "{firewall_rules}", "\n".join(powershell_commands) ) with open( os.path.join( upload_target_dir, "ebmigrateScripts", "modify_firewall_config.ps1" ), "w", ) as file: file.write(script_content) with open( os.path.join(upload_target_dir, "aws-windows-deployment-manifest.json") ) as file: manifest = json.load(file) manifest["deployments"]["custom"].append( create_custom_manifest_section( "ModifyFirewallConfig", "modify_firewall_config.ps1", "noop.ps1", "noop.ps1", ) ) with open( os.path.join(upload_target_dir, "aws-windows-deployment-manifest.json"), "w" ) as file: json.dump(manifest, file, indent=4) def create_custom_manifest_section( section_name: str, install_file: str, restart_file: str, uninstall_file: str, description: Optional[str] = None, ) -> Dict[str, any]: """ Create a custom deployment section for the AWS Windows Deployment Manifest. Generates a configuration dictionary for the 'custom' section of an Elastic Beanstalk Windows deployment manifest. This section defines PowerShell scripts to be executed during installation, restart, and uninstallation phases of deployment. Args: section_name: Name identifier for this custom deployment section install_file: Name of PowerShell script to run during installation restart_file: Name of PowerShell script to run during restart uninstall_file: Name of PowerShell script to run during uninstallation description: Optional description of the deployment section. Defaults to section_name if not provided. Returns: Dictionary containing the custom deployment section configuration: { "name": section name, "description": section description, "scripts": { "install": {"file": "ebmigrateScripts\\install_script.ps1"}, "restart": {"file": "ebmigrateScripts\\restart_script.ps1"}, "uninstall": {"file": "ebmigrateScripts\\uninstall_script.ps1"} } } Notes: - All scripts are expected to be in the 'ebmigrateScripts' directory - This section will be added to the 'custom' array in the manifest - The manifest is used by Elastic Beanstalk to orchestrate deployments """ ebmigrate_scripts_dir_name = "ebmigrateScripts" return { "name": section_name, "description": description or section_name, "scripts": { "install": {"file": f"{ebmigrate_scripts_dir_name}\\{install_file}"}, "restart": {"file": f"{ebmigrate_scripts_dir_name}\\{restart_file}"}, "uninstall": {"file": f"{ebmigrate_scripts_dir_name}\\{uninstall_file}"}, }, } def is_port_in_rule(port: Union[int, str], local_ports: str) -> bool: """ Check if a specific port is covered by a Windows Firewall rule's port specification. Evaluates whether a port number matches a firewall rule's local ports definition, handling both individual ports and port ranges. Args: port: Port number to check (can be integer or string) local_ports: Port specification string from firewall rule. Can be: - Individual ports: "80", "443" - Port ranges: "8081-8083" - Multiple specifications: "80,443,8081-8083" - "*" (all ports) Returns: True if the port is covered by the local_ports specification, False otherwise Notes: - Returns False for empty port specifications or "*" - Handles comma-separated lists of port specifications - Port ranges must be numeric and properly formatted (start-end) - Whitespace around port specifications is ignored Examples: >>> is_port_in_rule(80, "80") True >>> is_port_in_rule(80, "80,443") True >>> is_port_in_rule(8082, "8081-8083") True >>> is_port_in_rule(8080, "*") False """ if not local_ports or local_ports == "*": return False port = str(port) for part in local_ports.split(","): part = part.strip() # Handle ports specified as ranges. e.g. 8081-8083 if "-" in part: start, end = part.split("-") if start.isdigit() and end.isdigit(): if int(start) <= int(port) <= int(end): return True elif part == port: return True return False def get_firewall_rules(ports_to_check: Set[int]) -> List[Dict[str, Any]]: """ Retrieve Windows Firewall rules that apply to specified HTTP/HTTPS ports. Uses the Windows Firewall COM interface (INetFwPolicy2) to enumerate firewall rules and filter those that affect the specified ports. Args: ports_to_check: Set of port numbers (typically HTTP/HTTPS ports) to check for associated firewall rules Returns: List of dictionaries, each representing a firewall rule with: - Name: Rule name (str) - ServiceName: Associated Windows service name (str) - Protocol: 'TCP' or 'UDP' - LocalPorts: Port specification string - Action: 'Allow' or 'Block' - Enabled: Boolean indicating if rule is active Raises: EnvironmentError: If unable to access or query the Windows Firewall Notes: - Uses Windows Firewall COM interface (HNetCfg.FwPolicy2) - Only includes rules that explicitly reference the specified ports - Protocol values are mapped from: * 6 -> 'TCP' * 17 -> 'UDP' - Action values are mapped from: * 1 -> 'Allow' * 2 -> 'Block' """ try: fw_policy = win32com.client.Dispatch("HNetCfg.FwPolicy2") except Exception as e: io.log_error( f"Encountered failure during firewall configuration analysis: {str(e)}" ) raise EnvironmentError(e) rules = fw_policy.Rules rule_list = [] for rule in rules: if rule.LocalPorts and any( is_port_in_rule(p, rule.LocalPorts) for p in ports_to_check ): rule_list.append( { "Name": rule.Name, "ServiceName": rule.ServiceName, "Protocol": "TCP" if rule.Protocol == 6 else "UDP", "LocalPorts": rule.LocalPorts, "Action": "Allow" if rule.Action == 1 else "Block", "Enabled": rule.Enabled, } ) return rule_list def write_reinstate_iisstart_htm_default_document_script( upload_target_dir: str, ) -> None: """ Generate a PowerShell script to restore IIS's default document configuration. Creates a PowerShell script that ensures 'iisstart.htm' is reinstated as a default document in IIS's configuration. This is necessary because Elastic Beanstalk's deployment process removes 'iisstart.htm' from IIS's default document list, which can affect sites relying on this default document behavior. Default documents in IIS determine which files (e.g., 'iisstart.htm', 'default.htm', 'index.html') are served when a user requests a directory without specifying a specific file. Args: upload_target_dir: Base directory for deployment artifacts. The script will be written to '{upload_target_dir}/ebmigrateScripts/reinstate_iisstart_htm_default_document.ps1' Notes: - Script execution is managed by Elastic Beanstalk through its manifest file - The manifest entry ensures this script runs during deployment - Uses Add-WebConfigurationProperty cmdlet to modify IIS configuration - Modifies system.webServer/defaultDocument/files configuration section - Script is typically deployed to '.\\migrations\\latest\\upload_target\\ebmigrateScripts\' - Requires IIS WebAdministration module to be available on target system """ script_path = os.path.join( os.path.dirname(__file__), "migrate_scripts", "reinstate_iisstart_htm_default_document.ps1", ) with open(script_path, "r") as source_file: script_contents = source_file.read() with open( os.path.join( upload_target_dir, "ebmigrateScripts", "reinstate_iisstart_htm_default_document.ps1", ), "w", ) as file: file.write(script_contents) # TODO: allow override through .ebextensions or a `--alb-configs alb-configs.json` def get_listener_configs(sites: List["Site"], ssl_certificate_domain_name: str = None): """ Generate complete Elastic Beanstalk listener configurations from IIS site configurations. Processes IIS sites to create comprehensive ALB listener configurations, including HTTP and HTTPS listeners, rules, and process mappings. Handles both protocol types and automatically determines default processes. Args: sites: List of IIS Site objects to generate listener configs from ssl_certificate_domain_name: Optional ARN of SSL certificate for HTTPS listeners. Required if any HTTPS listeners are configured. Returns: List of Elastic Beanstalk option settings containing: - HTTP/HTTPS listener configurations with default processes - Listener rules with priorities and conditions - Process configurations with ports and protocols - Protocol mappings for each target group Returns empty list if no valid listener rules are found or on error. Notes: - Processes sites to extract bindings and create corresponding ALB rules - Handles both HTTP and HTTPS protocols if certificate provided - Creates process mappings based on port numbers - Assigns rule priorities based on specificity - Treats configuration errors as non-fatal """ option_settings = [] try: site_configs = get_site_configs(sites=sites) alb_rules = create_alb_rules(site_configs) converted_alb_rules = convert_alb_rules_to_option_settings( alb_rules, ssl_certificate_domain_name ) http_listener_rule_option_settings = ( converted_alb_rules.http_listener_rule_option_settings ) if ssl_certificate_domain_name: https_listener_rule_option_settings = ( converted_alb_rules.https_listener_rule_option_settings ) else: https_listener_rule_option_settings = [] process_protocol_mappings = converted_alb_rules.process_protocol_mappings if ( not http_listener_rule_option_settings and not https_listener_rule_option_settings ): return [] http_listener_rule_names = _extract_and_join_rule_names( http_listener_rule_option_settings ) if ssl_certificate_domain_name: https_listener_rule_names = _extract_and_join_rule_names( https_listener_rule_option_settings ) else: https_listener_rule_names = [] http_processes = _extract_process_values(http_listener_rule_option_settings) if ssl_certificate_domain_name: https_processes = _extract_process_values( https_listener_rule_option_settings ) else: https_processes = [] default_process_name = None if "default" in http_processes: default_process_name = "default" elif http_processes: default_process_name = sorted(list(http_processes))[0] default_https_process_name = None if https_processes: default_https_process_name = sorted(list(https_processes))[0] option_settings.extend( _create_http_listener_settings( default_process_name, http_listener_rule_names ) ) if https_listener_rule_option_settings: option_settings.extend( _create_https_listener_settings( default_https_process_name, https_listener_rule_names, ssl_certificate_domain_name, ) ) process_option_settings = _create_process_option_settings( process_protocol_mappings ) option_settings += ( http_listener_rule_option_settings + https_listener_rule_option_settings + process_option_settings ) return option_settings except Exception as e: io.log_warning( f"Error: {str(e)}. Treating listener rule creation as non-fatal. This might cause environment to be in degraded state." ) def _create_process_option_settings( process_protocol_mappings: Dict[str, str], ) -> List[dict]: """ Create Elastic Beanstalk process option settings from process-protocol mappings. Generates configuration settings for each process, including protocol and port settings. The 'default' process is mapped to port 80, while other processes use their name as the port. Args: process_protocol_mappings: Dictionary mapping process names to their protocols (e.g., {'default': 'HTTP', '8080': 'HTTPS'}) Returns: List of option settings dictionaries, each containing: - Namespace: AWS namespace for the process (aws:elasticbeanstalk:environment:process:{process}) - OptionName: Either 'Protocol' or 'Port' - Value: The corresponding protocol or port value Example: >>> _create_process_option_settings({'default': 'HTTP', '8080': 'HTTPS'}) [ { 'Namespace': 'aws:elasticbeanstalk:environment:process:default', 'OptionName': 'Protocol', 'Value': 'HTTP' }, { 'Namespace': 'aws:elasticbeanstalk:environment:process:default', 'OptionName': 'Port', 'Value': '80' }, { 'Namespace': 'aws:elasticbeanstalk:environment:process:8080', 'OptionName': 'Protocol', 'Value': 'HTTPS' }, { 'Namespace': 'aws:elasticbeanstalk:environment:process:8080', 'OptionName': 'Port', 'Value': '8080' } ] """ settings = [] for process, protocol in process_protocol_mappings.items(): port = "80" if process == "default" else process namespace = f"aws:elasticbeanstalk:environment:process:{process}" settings.extend( [ { "Namespace": namespace, "OptionName": "Protocol", "Value": protocol.upper(), }, {"Namespace": namespace, "OptionName": "Port", "Value": port}, ] ) return settings def _create_listener_settings( namespace: str, option_name_value_pairs: List[Tuple[str, str]] ) -> List[dict]: """ Create Elastic Beanstalk listener option settings from name-value pairs. Args: namespace: The AWS Elastic Beanstalk namespace for the settings option_name_value_pairs: List of tuples containing (option_name, value) pairs Returns: List of dictionaries, each containing: - Namespace: The provided namespace - OptionName: Name of the option - Value: Value for the option """ return [ {"Namespace": namespace, "OptionName": option_name, "Value": value} for option_name, value in option_name_value_pairs ] def _create_http_listener_settings( default_process_name: str, rule_names: str ) -> List[dict]: """ Create HTTP listener configuration settings for Elastic Beanstalk. Creates settings for an HTTP listener with specified default process and rules. Args: default_process_name: Name of the default process to handle requests rule_names: Comma-separated string of rule names Returns: List of option settings configuring an HTTP listener with: - HTTP protocol - Specified default process - Enabled listener - Specified routing rules """ option_name_value_pairs = [ ("Protocol", "HTTP"), ("DefaultProcess", default_process_name), ("ListenerEnabled", "true"), ("Rules", rule_names), ] return _create_listener_settings( "aws:elbv2:listener:default", option_name_value_pairs ) def _create_https_listener_settings( default_process_name: str, rule_names: str, ssl_cert: str ) -> List[dict]: """ Create HTTPS listener configuration settings for Elastic Beanstalk. Creates settings for an HTTPS listener with specified default process, rules, and SSL certificate. Args: default_process_name: Name of the default process to handle requests rule_names: Comma-separated string of rule names ssl_cert: ARN of the SSL certificate to use for HTTPS Returns: List of option settings configuring an HTTPS listener with: - HTTPS protocol - Specified default process - Enabled listener - Specified routing rules - SSL certificate configuration """ option_name_value_pairs = [ ("Protocol", "HTTPS"), ("DefaultProcess", default_process_name), ("ListenerEnabled", "true"), ("Rules", rule_names), ] if ssl_cert: option_name_value_pairs.append(("SSLCertificate", ssl_cert)) return _create_listener_settings("aws:elbv2:listener:443", option_name_value_pairs) def _extract_and_join_rule_names(option_settings: List[dict]) -> str: rule_names = {option["Namespace"].split(":")[-1] for option in option_settings} return ",".join(sorted(rule_names)) def _extract_process_values(option_settings: List[dict]) -> Set[str]: return { setting["Value"] for setting in option_settings if setting["OptionName"] == "Process" } @dataclass class ConvertedALBRules: """ Container for ALB rule configurations separated by protocol. Attributes: http_listener_rule_option_settings: List of option settings for HTTP listener rules https_listener_rule_option_settings: List of option settings for HTTPS listener rules process_protocol_mappings: Dictionary mapping process names to their protocols """ http_listener_rule_option_settings: List[Dict[str, str]] https_listener_rule_option_settings: List[Dict[str, str]] process_protocol_mappings: Dict[str, str] def convert_alb_rules_to_option_settings( alb_rules: List[Dict[str, Any]], ssl_certificate_domain_name: Optional[str] ) -> ConvertedALBRules: """ Convert ALB rules into Elastic Beanstalk option settings format. Transforms ALB rules into EB option settings, organizing them by protocol (HTTP/HTTPS) and creating the necessary listener rule configurations. Args: alb_rules: List of ALB rule dictionaries, each containing: - Priority: Rule priority number - Protocol: 'HTTP' or 'HTTPS' - Conditions: List of routing conditions (host-headers, path-pattern) - Actions: Forward actions with target group configuration Returns: ConvertedALBRules object containing: - HTTP listener rule options - HTTPS listener rule options - Process to protocol mappings Each listener rule option is a dictionary with: - Namespace: 'aws:elbv2:listenerrule:ruleN' where N is the rule number - OptionName: One of 'Priority', 'Process', 'HostHeaders', or 'PathPatterns' - Value: Corresponding value for the option Notes: - Process names are extracted from target group ARNs - Port 80 is mapped to 'default' process name - If no host header is specified, path pattern defaults to '*' """ http_listener_rule_option_settings: List[Dict[str, str]] = [] https_listener_rule_option_settings: List[Dict[str, str]] = [] process_protocol_mappings: Dict[str, str] = dict() for i, rule in enumerate(alb_rules, 1): host_header, path_pattern = None, None conditions = rule.get("Conditions", []) if conditions: for condition in conditions: if condition["Field"] == "host-header": host_header = condition["Values"][0] if condition["Field"] == "path-pattern": path_pattern = condition["Values"][0] namespace = f"aws:elbv2:listenerrule:rule{i}" protocol = rule["Protocol"].upper() listener_rule_option_settings = list() listener_rule_option_settings.append( { "Namespace": namespace, "OptionName": "Priority", "Value": str(rule["Priority"]), } ) target_group = rule["Actions"][0]["ForwardConfig"]["TargetGroups"][0][ "TargetGroupArn" ] process = target_group.split("/")[-1] if process == "80": process = "default" listener_rule_option_settings.append( {"Namespace": namespace, "OptionName": "Process", "Value": process} ) if host_header: listener_rule_option_settings.append( { "Namespace": namespace, "OptionName": "HostHeaders", "Value": host_header, } ) if not host_header: path_pattern = path_pattern or "*" if path_pattern: listener_rule_option_settings.append( { "Namespace": namespace, "OptionName": "PathPatterns", "Value": path_pattern, } ) if protocol.strip().lower() == "http": http_listener_rule_option_settings.extend(listener_rule_option_settings) else: https_listener_rule_option_settings.extend(listener_rule_option_settings) if protocol.strip().lower() == "http": process_protocol_mappings[process] = protocol.strip().upper() elif protocol.strip().lower() == "https" and ssl_certificate_domain_name: process_protocol_mappings[process] = protocol.strip().upper() return ConvertedALBRules( http_listener_rule_option_settings, https_listener_rule_option_settings, process_protocol_mappings, ) class SiteConfig: """ Configuration class representing an IIS website with its binding and path information. Args: name: The name of the IIS website binding_info: A colon-separated string containing binding information in the format "ip:port:hostname". Example: "*:80:example.com" physical_path: The filesystem path where the website content is located protocol: The web protocol used by the site ('http' or 'https') Attributes: name (str): The name of the IIS website port (int): The port number extracted from binding_info host_header (Optional[str]): The hostname for the site, or None if not specified physical_path (str): The filesystem path where the website content is located protocol (str): The lowercase protocol ('http' or 'https') rewrite_rules (List): A list of rewrite rules for the site (empty by default) Raises: ValueError: If binding_info doesn't contain enough segments for parsing """ def __init__( self, name: str, binding_info: str, physical_path: str, protocol: str ) -> None: self.name = name self.port = int(binding_info.split(":")[1]) self.host_header = binding_info.split(":")[2] or None self.physical_path = physical_path self.protocol = protocol.lower() self.rewrite_rules: List = [] def _parse_binding_info(binding: "Binding") -> Dict[str, Union[str, int]]: """ Parse an IIS binding configuration into a dictionary of its components. Takes a Binding object from IIS's configuration and splits its BindingInformation string (format: "IP:port:hostname") into individual components. Args: binding: An IIS Binding object containing BindingInformation and Protocol properties Returns: A dictionary containing: - 'ip' (str): IP address or '*' for all addresses - 'port' (int): Port number - 'host' (str): Hostname or empty string if not specified - 'protocol' (str): Protocol type ('http' or 'https') Raises: IndexError: If binding.BindingInformation doesn't contain the expected three colon-separated values """ parts = binding.BindingInformation.split(":") return { "ip": parts[0], "port": int(parts[1]), "host": parts[2], "protocol": binding.Protocol, # This should give us http/https } def get_site_configs(sites: List["Site"]): """ Retrieve and parse IIS site configurations from the local server. Connects to IIS using ServerManager and processes each site's: - Basic site information (name, bindings) - Physical path information - URL rewrite rules from web.config files Args: sites: List of IIS Site objects to process Returns: List[SiteConfig]: A list of SiteConfig objects, each containing: - name: Site name from IIS - binding_info: Parsed binding information (ip:port:hostname) - physical_path: Site's root directory path - protocol: HTTP/HTTPS protocol - rewrite_rules: List of dictionaries containing URL rewrite rules from web.config: * name: Rule name * pattern: URL pattern to match * action_type: Type of rewrite action (Rewrite/Redirect) * action_url: Target URL for rewrite Notes: - Processes each site's bindings separately - Parses web.config files for URL rewrite rules - Skips invalid web.config files with warning - Requires administrative access to IIS """ site_configs = [] for site in sites: for binding in site.Bindings: binding_info = _parse_binding_info(binding) config = SiteConfig( name=site.Name, binding_info=binding.BindingInformation, physical_path=site.Applications["/"] .VirtualDirectories["/"] .PhysicalPath, protocol=binding_info["protocol"], ) # Get rewrite rules from web.config web_config_path = os.path.join(config.physical_path, "web.config") if os.path.exists(web_config_path): try: tree = ET.parse(web_config_path) root = tree.getroot() rules = root.findall(".//rewrite/rules/rule") for rule in rules: config.rewrite_rules.append( { "name": rule.get("name"), "pattern": rule.find("match").get("url"), "action_type": rule.find("action").get("type"), "action_url": rule.find("action").get("url"), } ) except Exception as e: io.log_warning( f"Error reading web.config for {site.Name}: {str(e)}. Skipping over rewrite rule identification for {site.Name}" ) site_configs.append(config) return site_configs def _sort_rules_by_specificity(rules: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Sort Application Load Balancer rules by their specificity in descending order. Rules are sorted based on a scoring system where: - Path pattern conditions contribute 20 points - Host header conditions contribute 10 points The total score determines the rule's specificity, with higher scores indicating more specific rules. Args: rules: A list of ALB rule dictionaries. Each rule is a dictionary containing: - 'Actions': List of action configurations - 'Protocol': String indicating the protocol (e.g., "http") - 'Conditions' (optional): List of condition dictionaries with 'Field' and 'Values' Returns: A new list containing the same rules sorted by specificity in descending order, where rules with more specific conditions (higher scores) appear first. Example: A rule with both path-pattern and host-header conditions (score: 30) would be sorted before a rule with only a host-header condition (score: 10). """ def calculate_rule_specificity(rule): score = 0 conditions = rule.get("Conditions", []) for condition in conditions: if condition["Field"] == "host-header": score += 10 elif condition["Field"] == "path-pattern": score += 20 return score return sorted(rules, key=calculate_rule_specificity, reverse=True) def create_alb_rules(site_configs: List["SiteConfig"]) -> List[Dict[str, Any]]: """ Create Application Load Balancer (ALB) rules from IIS site configurations. Transforms IIS site configurations into ALB rules by: 1. Grouping sites by port 2. Creating host-header based rules for each site 3. Creating path-pattern based rules from site rewrite rules 4. Sorting rules by specificity and assigning priorities Args: site_configs: List of SiteConfig objects containing IIS site configurations Returns: List of ALB rule dictionaries, each containing: - Priority: Integer indicating rule precedence (1-based) - Protocol: The protocol ('http' or 'https') - Actions: Forward action configuration with target group ARN - Conditions: List of conditions: * host-header conditions from site bindings * path-pattern conditions from rewrite rules Process Flow: 1. Groups sites by port for target group creation 2. Processes host-header based rules first 3. Processes URL rewrite rules from web.config 4. Sorts rules by specificity (host+path > host > path) 5. Assigns sequential priorities to sorted rules Notes: - Creates synthetic target group ARNs using port numbers - Deduplicates patterns per host header - Only processes HTTP/HTTPS protocols - Host header rules take precedence over path patterns """ port_groups = collections.defaultdict(list) valid_protocols = ["http", "https"] for config in site_configs: if config.protocol in valid_protocols: port_groups[config.port].append(config) target_group_template_arn = ( "arn:aws:elasticloadbalancing:region:account-id:targetgroup/{port}" ) target_groups = { port: target_group_template_arn.format(port=port) for port in port_groups.keys() } unsorted_rules = _process_host_header_based_rules(port_groups, target_groups) processed_patterns = set() rewrite_rules = _process_url_rewrite_rules( port_groups, processed_patterns, target_groups ) unsorted_rules.extend(rewrite_rules) sorted_rules = _sort_rules_by_specificity(unsorted_rules) return _assign_priorities_after_sorting(sorted_rules) def _process_host_header_based_rules(port_groups, target_groups): host_header_rules = [] for iis_port, sites in port_groups.items(): for site in sites: host_rule = { "Actions": [ { "Type": "forward", "ForwardConfig": { "TargetGroups": [ {"TargetGroupArn": target_groups[iis_port], "Weight": 1} ] }, } ], "Protocol": site.protocol, } if site.host_header: host_rule["Conditions"] = [ {"Field": "host-header", "Values": [site.host_header]} ] host_header_rules.append(host_rule) continue return host_header_rules def _process_url_rewrite_rules(port_groups, processed_patterns, target_groups): rules = [] for iis_port, sites in port_groups.items(): for site in sites: for rule in site.rewrite_rules: rewrite_rule = _process_url_rewrite_rule( rule, processed_patterns, site, target_groups, iis_port ) rules.append(rewrite_rule) return rules def _process_url_rewrite_rule(rule, processed_patterns, site, target_groups, iis_port): # Create unique key for pattern+host combination pattern_key = f"{rule['pattern']}:{site.host_header}" # Skip if we've already processed this pattern for this host if pattern_key in processed_patterns: return None processed_patterns.add(pattern_key) conditions = [] if site.host_header: conditions.append({"Field": "host-header", "Values": [site.host_header]}) conditions.append( {"Field": "path-pattern", "Values": [translate_iis_to_alb(rule["pattern"])]} ) return _create_rewrite_rule(conditions, target_groups[iis_port], site.protocol) def _create_rewrite_rule( conditions: List[Dict[str, Union[str , List[str]]]], target_group_arn: str, protocol: str ) -> Dict[str, Any]: return { "Conditions": conditions, "Actions": [ { "Type": "forward", "ForwardConfig": { "TargetGroups": [{"TargetGroupArn": target_group_arn, "Weight": 1}] }, } ], "Protocol": protocol, } def _assign_priorities_after_sorting( sorted_rules: List[Dict[str, Any]], ) -> List[Dict[str, Any]]: alb_rules = [] for priority, rule in enumerate(sorted_rules, 1): rule["Priority"] = priority alb_rules.append(rule) return alb_rules def translate_iis_to_alb(iis_pattern: str) -> str: """ Convert an IIS (Internet Information Services) URL rewrite pattern to an ALB (Application Load Balancer) path pattern. This function takes a URL rewrite pattern typically used in IIS and transforms it into a format compatible with AWS Application Load Balancer (ALB) path patterns. The transformation involves several steps to simplify and standardize the pattern. Parameters: iis_pattern (str): The IIS URL rewrite pattern to be converted. This pattern may contain various regex elements such as anchors, capture groups, character classes, and quantifiers. Returns: str: The converted ALB path pattern. This pattern will be simplified to use '*' as a wildcard character and will be prefixed with a '/' if it does not already start with one. Example: >>> translate_iis_to_alb("^/products/{id:int}/details$") '/products/*/details' >>> translate_iis_to_alb("^/users/([a-zA-Z0-9]+)/profile$") '/users/*/profile' >>> translate_iis_to_alb("/articles/(.*)") '/articles/*' Note: - The function assumes that the input pattern is a valid IIS URL rewrite pattern. - The resulting ALB pattern will use '*' as a wildcard to match any sequence of characters. - Complex regex features not covered by the simplification rules will be approximated with '*'. """ alb_pattern = iis_pattern.strip("^$") # replace .+ with * alb_pattern = re.sub(r"\.\+", "?*", alb_pattern) # replace .* with * alb_pattern = re.sub(r"\.\*", "*", alb_pattern) # replace {segment} with * alb_pattern = re.sub("({.*})", "*", alb_pattern) alb_pattern = alb_pattern.lstrip("^") # replace groupings of the type "[0-9]+" with "*" alb_pattern = re.sub(r"\[.*?\]\+", "*", alb_pattern) # replace groupings of the type "[0-9]*" with "*" alb_pattern = re.sub(r"\[.*?\]\*", "*", alb_pattern) # replace groupings of the type "([0-9]*)" with "*" alb_pattern = re.sub(r"\([^()]*\)", "*", alb_pattern) # Ensure pattern starts with / if not alb_pattern.startswith("/"): alb_pattern = "/" + alb_pattern # Remove any double asterisks alb_pattern = alb_pattern.replace("**", "*") # Remove any trailing * alb_pattern = alb_pattern.rstrip("$") return alb_pattern