in aws-cloudknox-config/configrule/CLOUDKNOX_PCI.py [0:0]
def evaluate_compliance(event, configuration_item, valid_rule_parameters):
"""Form the evaluation(s) to be return to Config Rules
Return either:
None -- when no result needs to be displayed
a string -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE
a dictionary -- the evaluation dictionary, usually built by build_evaluation_from_config_item()
a list of dictionary -- a list of evaluation dictionary , usually built by build_evaluation()
Keyword arguments:
event -- the event variable given in the lambda handler
configuration_item -- the configurationItem dictionary in the invokingEvent
valid_rule_parameters -- the output of the evaluate_parameters() representing validated parameters of the Config Rule
Advanced Notes:
1 -- if a resource is deleted and generate a configuration change with ResourceDeleted status, the Boilerplate code will put a NOT_APPLICABLE on this resource automatically.
2 -- if a None or a list of dictionary is returned, the old evaluation(s) which are not returned in the new evaluation list are returned as NOT_APPLICABLE by the Boilerplate code
3 -- if None or an empty string, list or dict is returned, the Boilerplate code will put a "shadow" evaluation to feedback that the evaluation took place properly
"""
## Initialize AWS Config
configClient = boto3.client('config')
userList_ID= []
userList_Name=[]
# fetch_all_users: Get a list of all IAM users from AWS Config
allDiscoveredUsers = fetch_all_items(configClient,
configClient.list_discovered_resources,
'resourceIdentifiers',
resourceType='AWS::IAM::User',
limit=100)
## Get the AWS Config Resource Ids for each IAM user and put them in a list
for resourceIdentifiers in allDiscoveredUsers:
print('userIDinlist: ' + resourceIdentifiers['resourceId'])
print('userNameinlist: ' + resourceIdentifiers['resourceName'])
userList_ID.append(resourceIdentifiers['resourceId'])
userList_Name.append(resourceIdentifiers['resourceName'])
## CloudKnox Details in Secrets Manager
secretList = json.loads(get_secret_value('CloudKnoxSecretString'))
serviceId=""
apiId=""
accessKey=""
secretKey=""
accessToken=""
accountId=""
url=""
serviceId_key='serviceId'
apiId_key='apiId'
accessKey_key='accessKey'
secretKey_key='secretKey'
accountId_key= 'accountId'
url_key='url'
if serviceId_key in secretList:
serviceId = secretList[serviceId_key]
if apiId_key in secretList:
apiId = secretList[apiId_key]
if accessKey_key in secretList:
accessKey = secretList[accessKey_key]
if secretKey_key in secretList:
secretKey = secretList[secretKey_key]
if accountId_key in secretList:
accountId = secretList[accountId_key]
if url_key in secretList:
url = secretList[url_key]
millis = int(round(time.time() * 1000))
timestamp = str(millis)
## Define annotations for evaluation
complianceString = "User has a CloudKnox PCI score of: "
## Create empty list of evaluations to populate and return
evaluations = []
accessToken = getAccessToken(serviceId,timestamp,accessKey,secretKey,url,443)
print('accessToken is: ' + accessToken)
userResults = retrievePCI(apiId, accessToken, serviceId, timestamp, url, accountId, 443)
resourceindex = 0
for user in userResults:
username = user['name']
if (username in userList_Name):
resourceindex = userList_Name.index(username)
if user['pciScore']['category'] == 'HIGH':
## Report Non Compliance
annotation_str = complianceString + str(user['pciScore']['privilegeCreepIndex'])
annotation = build_annotation(annotation_str)
evaluations.append(build_evaluation(userList_ID[resourceindex],
'NON_COMPLIANT',
event,
annotation=annotation))
else:
## Report Compliance
annotation_str = complianceString + str(user['pciScore']['privilegeCreepIndex'])
annotation = build_annotation(annotation_str)
evaluations.append(build_evaluation(userList_ID[resourceindex],
'COMPLIANT',
event,
annotation=annotation))
if evaluations:
return evaluations
else:
return 'NOT_APPLICABLE'