disableDetective.py [123:236]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    return detective_regions


def assume_role(aws_account_number: str, role_name: str) -> boto3.Session:
    """
    Assumes the provided role in each account and returns a Detective client.

    Args:
        - aws_account_number: AWS Account Number
        - role_name: Role to assume in target account

    Returns:
        Detective client in the specified AWS Account and Region
    """
    try:
        # Beginning the assume role process for account
        sts_client = boto3.client('sts')

        # Get the current partition
        partition = sts_client.get_caller_identity()['Arn'].split(":")[1]

        response = sts_client.assume_role(
            RoleArn='arn:{}:iam::{}:role/{}'.format(
                partition,
                aws_account_number,
                role_name
            ),
            RoleSessionName='EnableDetective'
        )
        # Storing STS credentials
        session = boto3.Session(
            aws_access_key_id=response['Credentials']['AccessKeyId'],
            aws_secret_access_key=response['Credentials']['SecretAccessKey'],
            aws_session_token=response['Credentials']['SessionToken']
        )
    except Exception as e:
        logging.exception(f'exception: {e}')

    logging.info(f"Assumed session for {aws_account_number}.")

    return session


def get_graphs(d_client: botocore.client.BaseClient) -> typing.List[str]:
    """
    Get graphs in a specified region.

    Args:
        - d_client: Detective boto3 client generated from the master session.

    Returns:
        List of graph Arns.
    """
    try:
        response = d_client.list_graphs()
    except botocore.exceptions.EndpointConnectionError:
        logging.exception(f'exception: {e}')
        return []

    # use .get function to avoid KeyErrors when a dictionary key doesn't exist
    # it returns an empty list instead.
    # map iterates over all elements under a list and applied a function to them,
    # in this specific case, the element 'Arn' is extracted from the dictionary
    # (graphlist is a list of dictionaries)
    return [x['Arn'] for x in response.get('GraphList', [])]


def get_members(d_client: botocore.client.BaseClient, graphs: typing.List[str]) ->\
        (typing.Dict[str, typing.Set[str]], typing.Dict[str, typing.Set[str]]):
    """
    Get member accounts for all behaviour graphs in a region.

    Args:
        - d_client: Detective boto3 client generated from the master session.
        - graphs: List of graphs arns

    Returns:
        Two dictionaries: one with all account ids, other with the ones pending to accept
        the invitation.
    """
    try:
        # itertools.tee creates two independent iterators from a single one. This way
        # we can iterate the iterator twice: one to return all elements and other to return
        # the ones pending to be invited.
        ####
        # check the value of NextToken in the response. if it is non-null, pass it back into a subsequent list_members call (and keep doing this until a null token is returned)
        def _master_memberList(g: str) -> typing.List[typing.Dict]:
            # create a list to append the member accounts
            memberAccounts = []
            # create a dictionary for the nextToken from each call
            tokenTracker = {}
            # loop through list_members call results and take action for each returned result
            while True:
                # list_members of graph "g" and return the first 100 results
                members = d_client.list_members(
                    GraphArn=g, MaxResults=100, **tokenTracker)
                # add the returned list members to the list
                memberAccounts.extend(members['MemberDetails'])
                # if the returned results have a "NextToken" key then use it to query again
                if 'NextToken' in members:
                    tokenTracker['NextToken'] = members['NextToken']
                # if the returned results do not have a "NextToken" key then exit the loop
                else:
                    break
            # return members list.
            # The return statement doesn't need ()
            return memberAccounts
    except Exception as e:
        logging.exception(f'exception when getting memebers: {e}')
    # iterate through each list and return results
    all_ac, pending = itertools.tee((g, _master_memberList(g))
                                    for g in graphs)
    return ({g: {x['AccountId'] for x in v} for g, v in all_ac},
            {g: {x['AccountId'] for x in v if x['Status'] == 'INVITED'} for g, v in pending})
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



enableDetective.py [129:244]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    return detective_regions


def assume_role(aws_account_number: str, role_name: str) -> boto3.Session:
    """
    Assumes the provided role in each account and returns a Detective client.

    Args:
        - aws_account_number: AWS Account Number
        - role_name: Role to assume in target account

    Returns:
        Detective client in the specified AWS Account and Region
    """
    try:
        # Beginning the assume role process for account
        sts_client = boto3.client('sts')

        # Get the current partition
        partition = sts_client.get_caller_identity()['Arn'].split(":")[1]

        response = sts_client.assume_role(
            RoleArn='arn:{}:iam::{}:role/{}'.format(
                partition,
                aws_account_number,
                role_name
            ),
            RoleSessionName='EnableDetective'
        )
        # Storing STS credentials
        session = boto3.Session(
            aws_access_key_id=response['Credentials']['AccessKeyId'],
            aws_secret_access_key=response['Credentials']['SecretAccessKey'],
            aws_session_token=response['Credentials']['SessionToken']
        )
    except Exception as e:
        logging.exception(f'exception: {e}')
        
    logging.info(f"Assumed session for {aws_account_number}.")

    return session


def get_graphs(d_client: botocore.client.BaseClient) -> typing.List[str]:
    """
    Get graphs in a specified region.

    Args:
        - d_client: Detective boto3 client generated from the master session.

    Returns:
        List of graph Arns.
    """
    try:
        response = d_client.list_graphs()
    except botocore.exceptions.EndpointConnectionError:
        logging.exception(f'exception: {e}')
        return []

    # use .get function to avoid KeyErrors when a dictionary key doesn't exist
    # it returns an empty list instead.
    # map iterates over all elements under a list and applied a function to them,
    # in this specific case, the element 'Arn' is extracted from the dictionary
    # (graphlist is a list of dictionaries)
    return [x['Arn'] for x in response.get('GraphList', [])]


def get_members(d_client: botocore.client.BaseClient, graphs: typing.List[str]) ->\
        (typing.Dict[str, typing.Set[str]], typing.Dict[str, typing.Set[str]]):
    """
    Get member accounts for all behaviour graphs in a region.

    Args:
        - d_client: Detective boto3 client generated from the master session.
        - graphs: List of graphs arns

    Returns:
        Two dictionaries: one with all account ids, other with the ones pending to accept
        the invitation.
    """
    try:
        # itertools.tee creates two independent iterators from a single one. This way
        # we can iterate the iterator twice: one to return all elements and other to return
        # the ones pending to be invited.
        ####
        # check the value of NextToken in the response. if it is non-null, pass it back into a subsequent list_members call (and keep doing this until a null token is returned)
        def _master_memberList(g: str) -> typing.List[typing.Dict]:
            # create a list to append the member accounts
            memberAccounts = []
            # create a dictionary for the nextToken from each call
            tokenTracker = {}
            # loop through list_members call results and take action for each returned result
            while True:
                # list_members of graph "g" and return the first 100 results
                members = d_client.list_members(
                    GraphArn=g, MaxResults=100, **tokenTracker)
                # add the returned list members to the list
                memberAccounts.extend(members['MemberDetails'])
                # if the returned results have a "NextToken" key then use it to query again
                if 'NextToken' in members:
                    tokenTracker['NextToken'] = members['NextToken']
                # if the returned results do not have a "NextToken" key then exit the loop
                else:
                    break
            # return members list.
            # The return statement doesn't need ()
            return memberAccounts
    except Exception as e:
        logging.exception(f'exception when getting memebers: {e}')
        
    # iterate through each list and return results
    all_ac, pending = itertools.tee((g, _master_memberList(g))
                                    for g in graphs)
    
    return ({g: {x['AccountId'] for x in v} for g, v in all_ac},
            {g: {x['AccountId'] for x in v if x['Status'] == 'INVITED'} for g, v in pending})
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



