def _update_smus_provisioning_role()

in migration/bring-your-own-role/byor.py [0:0]


def _update_smus_provisioning_role(datazone_client, iam_client, domain_id, byor_role_arn, execute_flag):
    # Find Provisioning Role of current SMUS Domain
    tooling_blueprint = datazone_client.list_environment_blueprints(
        domainIdentifier=domain_id,
        managed=True,
        name="Tooling"
    )['items'][0]
    tooling_blueprint_config = datazone_client.get_environment_blueprint_configuration(
        domainIdentifier=domain_id,
        environmentBlueprintIdentifier=tooling_blueprint['id']
    )
    provisioning_role_name = tooling_blueprint_config['provisioningRoleArn'].split('/')[2]
    print(f"Updating SageMaker Unified Studio Provisioning Role \"{provisioning_role_name}\" to have necessary permissions to {byor_role_arn}...\n")
    # Get AWS managed policy "SageMakerStudioProjectProvisioningRolePolicy"
    managed_policies = iam_client.list_attached_role_policies(
        RoleName=provisioning_role_name
    )
    new_policy_statements_to_append = []
    # Traverse all statements in SMUS Provisioning Role's AWS managed policies
    for managed_policy in managed_policies['AttachedPolicies']:
        if managed_policy['PolicyArn'].startswith('arn:aws:iam::aws:policy/'):
            policy = iam_client.get_policy(PolicyArn=managed_policy['PolicyArn'])
            policy_version = iam_client.get_policy_version(
                PolicyArn=policy['Policy']['Arn'],
                VersionId=policy['Policy']['DefaultVersionId']
            )
            for statement in policy_version['PolicyVersion']['Document']['Statement']:
                if 'Resource' in statement:
                    resources = statement['Resource'] if isinstance(statement['Resource'], list) else [statement['Resource']]
                    # Copy policy statement granting permission to datazone_usr_role_*
                    if any('arn:aws:iam::*:role/datazone_usr_role_*' in resource for resource in resources):
                        new_policy_statements_to_append.append(statement)
    # Define the policy document, replace resource to "byor_role_arn"
    for statement_to_append in new_policy_statements_to_append:
        statement_to_append['Resource'] = [byor_role_arn]
    try:
        # Get existing inline policy and combine its Resource with "new_policy_statements_to_append"
        current_inline_policy = iam_client.get_role_policy(
            RoleName=provisioning_role_name,
            PolicyName='byoInlinePolicy'
        )
        # Get the existing policy document
        current_inline_policy_doc = current_inline_policy['PolicyDocument']
        current_inline_policy_doc['Statement'] = _ensure_list(current_inline_policy_doc['Statement'])
        # Find old BYOR roles by checking existing inline policy
        # Combine old BYOR roles with new BYOR role, use combined Resource list as new inline policy's Resource
        old_byor_roles = set(_ensure_list(current_inline_policy_doc['Statement'][0]['Resource']))
        for statement_to_append in new_policy_statements_to_append:
            statement_to_append['Resource'] = list(set([byor_role_arn]).union(old_byor_roles))
        # Update existing inline policy
        if execute_flag:
            new_policy_document = {
                "Version": "2012-10-17",
                "Statement": new_policy_statements_to_append
            }
            iam_client.put_role_policy(
                RoleName=provisioning_role_name,
                PolicyName='byoInlinePolicy',
                PolicyDocument=json.dumps(new_policy_document)
            )
            print(f"Updated existing inline policy 'byoInlinePolicy' for role {provisioning_role_name}, new policy:\n")
            pprint(new_policy_statements_to_append)

    except iam_client.exceptions.NoSuchEntityException:
        # Create new inline policy
        new_policy_document = {
            "Version": "2012-10-17",
            "Statement": new_policy_statements_to_append
        }
        if execute_flag:
            iam_client.put_role_policy(
                RoleName=provisioning_role_name,
                PolicyName='byoInlinePolicy',
                PolicyDocument=json.dumps(new_policy_document)
            )
            print(f"Created new inline policy 'byoInlinePolicy' for role {provisioning_role_name}, new policy:\n")
            pprint(new_policy_document)

    if not execute_flag:
        print(f"Skipping update/create inline policy 'byoInlinePolicy' for role {provisioning_role_name}, set --execute flag to True to do the actual update.\n")