tools/iam-recommender-at-scale/common.py (223 lines of code) (raw):
"""Common utils to support apply and reverting recommendations on bulk."""
import collections
from concurrent import futures
import json
import logging
import time
from google_auth_httplib2 import AuthorizedHttp
import httplib2
from google.oauth2 import service_account
class Recommendation(object):
"""Encapsulate Recommendation information required to compute hero metrics."""
def __init__(self, data):
self.name = data["name"]
self.etag = data["etag"]
self.state = self.get_state(data)
self.principal = set()
self.principal_type = ""
self.remove_role = set()
self.add_roles = set()
self.resource = set()
self.extract_recommendation(data)
self.check_integrity()
self.update_data()
def __repr__(self):
return repr(
(self.state, self.principal, self.principal_type, self.remove_role,
self.add_roles, self.resource, self.name, self.etag))
def get_state(self, data):
"""Get state of the recommendation."""
if data["stateInfo"]["state"] == "ACTIVE":
return "ACTIVE"
elif data["stateInfo"]["state"] == "SUCCEEDED":
if ("reverted" in data["stateInfo"].get("stateMetadata", {}) and
data["stateInfo"]["stateMetadata"].get("reverted",
"false") == "true"):
return "SUCCEEDED_REVERTED"
else:
return "SUCCEEDED"
return data["stateInfo"]["state"]
def extract_recommendation(self, data):
"""Populate recommendation data from a recommendation payload."""
for op_grps in data.get("content", {}).get("operationGroups", []):
for op in op_grps["operations"]:
if op["action"] == "remove":
self.principal.add(
op["pathFilters"]["/iamPolicy/bindings/*/members/*"])
self.resource.add(op["resource"])
self.remove_role.add(
op["pathFilters"]["/iamPolicy/bindings/*/role"])
elif op["action"] == "add":
self.resource.add(op["resource"])
self.add_roles.add(
op["pathFilters"]["/iamPolicy/bindings/*/role"])
self.principal.add(op["value"])
else:
raise ValueError("Wrong action : " + op["action"])
def check_integrity(self):
"""Check invariance of a recommendation payload."""
assert len(
self.principal
) == 1, "there should be exactly one principal. principal : " + str(
self.principal)
assert len(
self.remove_role
) == 1, "there should be exactly one removed role. remove_role: " + str(
self.remove_role)
assert len(
self.resource
) == 1, "there should be exactly one resource. resource: " + str(
self.resource)
def update_data(self):
"""Update recommendation data after checking the integrity."""
self.principal = self.principal.pop()
self.principal_type = self.principal.split(":")[0]
self.resource = self.resource.pop()
def rate_limit_execution(f, rate_limit, *args):
"""Execute multiple threads of function f for args while respecting the rate limit.
Args:
f: function to execute
rate_limit: rate with which the functions should be executed.
*args: Args provided for executing the function f.
Returns:
Output of executing f on args
"""
i = 0
n = len(args[0])
all_output = []
max_request, duration = rate_limit
while i < n:
tic = int(time.time())
with futures.ThreadPoolExecutor(max_workers=max_request) as executor:
output_ = executor.map(f, *[arg[i:i + max_request] for arg in args])
i += max_request
all_output.extend(output_)
toc = int(time.time())
diff = toc - tic
if diff < duration and i < n:
time.sleep(duration - diff)
logging.info("Finish investigating %d items out of total %d items.",
min(i, n), n)
return all_output
def get_recommendations(project_id, recommender, state, credentials):
"""Returns all recommendtions.
Args:
project_id: (str) Project for which to get the recommendtion.
recommender: Recommender stub to call recommender API
state: state of the recommendation
credentials: client credentials
"""
http = httplib2.Http()
authorize_http = AuthorizedHttp(credentials, http=http)
parent = "projects/{}/locations/global/recommenders/google.iam.policy.Recommender".format(
project_id)
fields = [
"recommendations/stateInfo/state", "recommendations/content",
"recommendations/etag", "recommendations/name",
"recommendations/stateInfo/stateMetadata"
]
try:
request = recommender.projects().locations().recommenders(
).recommendations().list(parent=parent, fields=",".join(fields))
response = request.execute(http=authorize_http)
recommendation_data = [
Recommendation(r) for r in response.get("recommendations", [])
]
return [r for r in recommendation_data if r.state == state]
except:
return []
def update_recommendation_status(recommendation, recommender_client, metadata,
credentials):
"""Update the recommendation status for the recommendations.
Args:
recommendation: Recommendation on IAM policy.
recommender_client: Iam recommender client.
metadata: (Dict) metadata to update the recommendation state.
credentials: service account credentials.
Returns:
Recommendations with updated status.
"""
http = httplib2.Http()
authorize_http = AuthorizedHttp(credentials, http=http)
return (recommender_client.projects().locations().recommenders().
recommendations().markSucceeded(name=recommendation["id"],
body={
"etag": recommendation["etag"],
"stateMetadata": metadata
}).execute(http=authorize_http))
def get_current_policy(resourcemanager_v1, project_id, credentials):
"""Returns the current policy associated with project_id.
Args:
resourcemanager_v1: ResourcemanagerV1 stub to call IAM API
project_id: (str) Project for which to get the recommendtion.
credentials: client credentials
"""
http = httplib2.Http()
authorize_http = AuthorizedHttp(credentials, http=http)
request = resourcemanager_v1.projects().getIamPolicy(resource=project_id)
cur_policy = request.execute(http=authorize_http)
del cur_policy["etag"]
return cur_policy
def update_policy(resourcemanager_v1, project_id, credentials, new_policy):
"""Returns the new policy associated with project_id.
Args:
resourcemanager_v1: ResourcemanagerV1 stub to call IAM API
project_id: (str) Project for which to get the recommendtion.
credentials: client credentials
new_policy: New policy to set on the project
"""
http = httplib2.Http()
authorize_http = AuthorizedHttp(credentials, http=http)
set_policy_request = resourcemanager_v1.projects().setIamPolicy(
resource=project_id, body={"policy": new_policy})
return set_policy_request.execute(http=authorize_http)
def get_credentials(service_account_file_path, scopes=None):
"""Returns credentials from a service_account_file_path.
Args:
service_account_file_path: (str) Path to service account key.
scopes: List scopes for service account
"""
if scopes is None:
scopes = ["https://www.googleapis.com/auth/cloud-platform"]
return service_account.Credentials.from_service_account_file(
service_account_file_path, scopes=scopes)
def diff_between_policies(old_policy, new_policy):
"""Returns the difference between two policies.
Args:
old_policy: Old policy
new_policy: New policy
"""
old_bindings = collections.defaultdict(set)
for b in old_policy["bindings"]:
if "condition" in b:
continue
for principal in b["members"]:
old_bindings[principal].add(b["role"])
new_bindings = collections.defaultdict(set)
for b in new_policy["bindings"]:
if "condition" in b:
continue
for principal in b["members"]:
new_bindings[principal].add(b["role"])
all_principals = {*old_bindings.keys(), *new_bindings.keys()}
entries = []
for principal in sorted(all_principals):
new_roles = new_bindings[principal]
old_roles = old_bindings[principal]
if new_roles == old_roles:
continue
removed_roles = old_roles - new_roles
added_roles = new_roles - old_roles
entry = {
"principal": principal,
"removed_roles": list(removed_roles),
"added_roles": list(added_roles)
}
entries.append(entry)
return json.dumps({"diff_policy": entries}, sort_keys=True, indent=4)
def remove_role_from_policy(policy, recommendation):
"""Remove roles for a policy based on recommendations.
Args:
policy: IAM policy.
recommendation: Recommendation on IAM policy.
Returns:
None. Change the policy in place.
"""
is_acted_recommendation = False
acted_and_succeeded = False
if not recommendation["role_recommended_to_be_removed"]:
return True # No role to be removed.
for binding in policy["bindings"]:
if binding["role"] not in recommendation[
"role_recommended_to_be_removed"]:
continue
if "condition" in binding:
continue
try:
is_acted_recommendation = True
binding["members"].remove(recommendation["principal"])
recommendation["role_recommended_to_be_removed"].remove(
binding["role"])
acted_and_succeeded = True
except:
logging.error("`%s` does not have `role:%s`.",
recommendation["principal"],
recommendation["role_recommended_to_be_removed"])
if not is_acted_recommendation:
logging.error("`%s` does not have `role:%s`.",
recommendation["principal"],
recommendation["role_recommended_to_be_removed"])
return is_acted_recommendation and acted_and_succeeded
def add_roles_in_policy(policy, recommendation):
"""Add roles in the policy based on recommendations.
Args:
policy: IAM policy.
recommendation: Recommendation on IAM policy.
Returns:
None. Change the policy in place.
"""
is_acted_recommendation = False
roles_to_be_added = set(
recommendation["roles_recommended_to_be_replaced_with"])
for binding in policy["bindings"]:
if binding["role"] not in roles_to_be_added:
continue
if "condition" in binding:
continue
binding["members"].append(recommendation["principal"])
roles_to_be_added.remove(binding["role"])
for role in roles_to_be_added:
policy["bindings"].append({
"role": role,
"members": [recommendation["principal"]]
})
is_acted_recommendation = True
return is_acted_recommendation
def writefile(data, output_file):
with open(output_file, "w") as f:
f.write(data)
def describe_recommendations(recommendations):
"""Returns a json string representation of recommendation with selected fileds.
Args:
recommendations: List(common.Recommendation)
"""
recommendations_sorted = sorted(recommendations, key=lambda x: x.principal)
data = []
for r in recommendations_sorted:
data.append({
"id": r.name,
"etag": r.etag,
"principal": r.principal,
"role_recommended_to_be_removed": list(r.remove_role),
"roles_recommended_to_be_replaced_with": list(r.add_roles)
})
return json.dumps({"recommendations": data}, indent=4, sort_keys=True)