in servicecatalog_puppet/commands/manifest.py [0:0]
def expand(f, puppet_account_id, single_account, subset=None):
click.echo("Expanding")
manifest = manifest_utils.load(f, puppet_account_id)
org_iam_role_arn = config.get_org_iam_role_arn(puppet_account_id)
if org_iam_role_arn is None:
click.echo("No org role set - not expanding")
new_manifest = manifest
else:
click.echo("Expanding using role: {}".format(org_iam_role_arn))
with betterboto_client.CrossAccountClientContextManager(
"organizations", org_iam_role_arn, "org-iam-role"
) as client:
new_manifest = manifest_utils.expand_manifest(manifest, client)
click.echo("Expanded")
if single_account:
click.echo(f"Filtering for single account: {single_account}")
for account in new_manifest.get("accounts", []):
if str(account.get("account_id")) == str(single_account):
click.echo(f"Found single account: {single_account}")
new_manifest["accounts"] = [account]
break
click.echo("Filtered")
new_manifest = manifest_utils.rewrite_cfct(new_manifest)
new_manifest = manifest_utils.rewrite_depends_on(new_manifest)
new_manifest = manifest_utils.rewrite_ssm_parameters(new_manifest)
new_manifest = manifest_utils.rewrite_stacks(new_manifest, puppet_account_id)
new_manifest = manifest_utils.rewrite_scps(new_manifest, puppet_account_id)
new_manifest = manifest_utils.parse_conditions(new_manifest)
if subset:
click.echo(f"Filtering for subset: {subset}")
new_manifest = manifest_utils.isolate(
manifest_utils.Manifest(new_manifest), subset
)
manifest_accounts_all = [
{"account_id": a.get("account_id"), "email": a.get("email")}
for a in new_manifest.get("accounts", [])
]
manifest_accounts_excluding = [
a for a in manifest_accounts_all if a.get("account_id") != puppet_account_id
]
# handle all accounts
sct_manifest_accounts = json.dumps(manifest_accounts_all).replace('"', '\\"')
sct_manifest_spokes = json.dumps(manifest_accounts_excluding).replace('"', '\\"')
regions = config.get_regions(puppet_account_id)
sct_config_regions = json.dumps(regions).replace('"', '\\"')
new_manifest["parameters"]["SCTManifestAccounts"] = dict(
default=sct_manifest_accounts
)
new_manifest["parameters"]["SCTManifestSpokes"] = dict(default=sct_manifest_spokes)
new_manifest["parameters"]["SCTConfigRegions"] = dict(default=sct_config_regions)
new_manifest["parameters"]["SCTAccountId"] = dict(default=puppet_account_id)
if new_manifest.get(constants.LAMBDA_INVOCATIONS) is None:
new_manifest[constants.LAMBDA_INVOCATIONS] = dict()
home_region = config.get_home_region(puppet_account_id)
with betterboto_client.ClientContextManager("ssm") as ssm:
response = ssm.get_parameter(Name="service-catalog-puppet-version")
version = response.get("Parameter").get("Value")
new_manifest["config_cache"] = dict(
home_region=home_region,
regions=regions,
should_collect_cloudformation_events=config.get_should_use_sns(
puppet_account_id, home_region
),
should_forward_events_to_eventbridge=config.get_should_use_eventbridge(
puppet_account_id, home_region
),
should_forward_failures_to_opscenter=config.get_should_forward_failures_to_opscenter(
puppet_account_id, home_region
),
puppet_version=version,
)
new_name = f.name.replace(".yaml", "-expanded.yaml")
logger.info("Writing new manifest: {}".format(new_name))
with open(new_name, "w") as output:
output.write(yaml_utils.dump(new_manifest))