in src/access-analyzer/analyze.py [0:0]
def get_policy_analyzer_project(project_id):
authed_session = AuthorizedSession(cred)
keyactivity = []
def process_keyactivity(response):
if response and "error" not in response:
for activity in response["activities"]:
name = activity["fullResourceName"].split("/")
sa_email = name[6]
sa_data = {
"keyId": name[8],
"keyLastUse": None,
"recommenderSubtype": None,
"recommenderDescription": None,
"recommenderRevokedIamPermissionsCount": None,
"recommenderPriority": None,
"associatedRecommendation": None,
}
sa_data["keyLastUse"] = activity["activity"].get(
"lastAuthenticatedTime", None
)
recommendations = get_recommendations(project_id, sa_email)
if recommendations:
sa_data.update(recommendations)
keyactivity.append(sa_data)
response = authed_session.get(
f"https://policyanalyzer.googleapis.com/v1/projects/{project_id}/"
+ "locations/global/activityTypes/serviceAccountKeyLastAuthentication/activities:query"
).json()
process_keyactivity(response)
while "nextPageToken" in response:
params = {"pageToken": response["nextPageToken"]}
response = authed_session.get(
f"https://policyanalyzer.googleapis.com/v1/projects/{project_id}/"
+ "locations/global/activityTypes/serviceAccountKeyLastAuthentication/activities:query",
params=params,
).json()
process_keyactivity(response)
return keyactivity