in aws-cloudknox-config/ssm/CloudKnox_IAMRightsize.py [0:0]
def get_cloudknox_remediation_policies(access_token, user_arn, ck_config=None):
"""
calls cloudknox api to get the list of policies as per usage
:param access_token: cloudknox api token
:param user_arn: cloudknox api token
:param ck_config: cloudknox service id
:return cloudknox_remediated_policies: cloudknox remediated policy documents
"""
assert isinstance(ck_config, dict), 'ck config must be of type dict'
conn = http.client.HTTPSConnection(ck_config['url'], ck_endpoint_port)
headers = {
'X-CloudKnox-Access-Token': access_token,
'X-CloudKnox-API-Id': ck_config['apiId'],
'X-CloudKnox-Service-Account-Id': ck_config['serviceId'],
'X-CloudKnox-Timestamp-Millis': str(curr_time),
'Content-Type': "application/json"
}
cloudknox_dict = {
'authSystemInfo': {'id': ck_config['accountId'], 'type': 'AWS'},
'identityType': 'USER',
'identityIds': [user_arn],
'aggregation': {'type': 'SUMMARY'},
'requestParams': {'scope': None, 'resource': None, 'resources': None, 'condition': None},
'filter': {'historyDays': 90}
}
payload = json.dumps(cloudknox_dict)
logger.info(f'remediation policy request payload {payload}')
conn.request("POST", "/api/v2/role-policy/new", payload, headers)
res = conn.getresponse()
data = res.read()
data_raw = data.decode()
logger.info(f'raw data received for remediation policy {data_raw}')
response = json.loads(data.decode("utf-8"))
cloudknox_remediated_policies = response['data']
if len(response['data']) == 0 or response.get('errorCode'):
default_policy = {
'Version': '2012-10-17',
'Statement': [{
'Sid': 'AllowIAM',
'Effect': 'Allow',
'Action': ['iam:CreateRole'],
'Resource': '*'
}]
}
policy_data = {'policyName': "ck_activity_test", 'policy': default_policy}
data_list = [{}] * 1
data_list[0] = policy_data
cloudknox_remediated_policies = data_list
return cloudknox_remediated_policies