server/plugins/github.py (378 lines of code) (raw):

"""Miscellaneous GitHub classes for Boxer""" import aiohttp import typing import json import plugins.projects import dateutil.parser import asyncio import time GRAPHQL_URL = "https://api.github.com/graphql" DEBUG = False # Set to True to disable GitHub API calls GITHUB_RESULTS_PER_QUERY = 40 # Number of results to gather per paginated result. # Default is 100, but due to recent issues with timeouts, # we have lowered it to 40. class GitHubOrganisation: """Simple GitHub Organization with GraphQL mixins for added speed""" login: str orgid: typing.Optional[int] api_headers: typing.Dict[str, str] teams: typing.List["GitHubTeam"] repositories: typing.List[str] def __init__( self, login: str = "apache", personal_access_token: str = "", bearer_token: str = "", ): """ Instantiate a GitHub Organization :param login: The GitHub organization to instantiate, e.g. apache :param personal_access_token: Personal access token from an org owner account :param bearer_token: If using bearer token instead of PAT, use it here """ assert ( personal_access_token or bearer_token ), "You must specify either a PAT or a bearer token!" self.login = login self.orgid = None self.teams = list() self.repositories = list() if personal_access_token: self.api_headers = {"Authorization": "token %s" % personal_access_token} else: self.api_headers = {"Authorization": "bearer %s" % bearer_token} async def get_id(self): """Fetches the organization's GitHub ID. This must be called once before membership or repository additions/deletions, to ensure we can perform these calls using database IDs only.""" if self.orgid is None: async with aiohttp.ClientSession(headers=self.api_headers) as session: url = f"https://api.github.com/orgs/{self.login}" async with session.get(url) as rv: js = await rv.json() self.orgid = js["id"] return self.orgid async def api_delete(self, url: str): if DEBUG: print("[DEBUG] DELETE", url) else: async with aiohttp.ClientSession(headers=self.api_headers) as session: async with session.delete(url) as rv: txt = await rv.text() assert rv.status == 204, f"Unexpected return code for DELETE on {url}: {rv.status}" await asyncio.sleep(1.05) return txt async def api_put(self, url: str, jsdata: typing.Optional[dict] = None): if DEBUG: print("[DEBUG] PUT", url) else: async with aiohttp.ClientSession(headers=self.api_headers) as session: async with session.put(url, json=jsdata) as rv: txt = await rv.text() assert rv.status in [200, 204], f"Unexpected return code for PUT on {url}: {rv.status}" await asyncio.sleep(1.05) return txt async def api_patch(self, url: str, jsdata: typing.Optional[dict] = None): if DEBUG: print("[DEBUG] PATCH", url) else: async with aiohttp.ClientSession(headers=self.api_headers) as session: async with session.patch(url, json=jsdata) as rv: txt = await rv.text() assert rv.status in [200, 204], f"Unexpected return code for PATCH on {url}: {rv.status}" await asyncio.sleep(1.05) return txt async def api_post(self, url: str, jsdata: typing.Optional[dict] = None): if DEBUG: print("[DEBUG] POST", url) else: async with aiohttp.ClientSession(headers=self.api_headers) as session: async with session.post(url, json=jsdata) as rv: txt = await rv.text() assert rv.status == 201, f"Unexpected retun code for POST on {url}: {rv.status}" await asyncio.sleep(1.05) return txt def get_team(self, team: str, private: bool) -> typing.Optional["GitHubTeam"]: """Locate a GitHub team and return it""" slug = f"{team}-committers" if private: slug = f"{team}-private" for xteam in self.teams: if xteam.slug == slug: return xteam return None async def rate_limit_rest(self): """Fetches the hourly REST API limit and how many uses have been expended this hour.""" async with aiohttp.ClientSession(headers=self.api_headers) as session: url = "https://api.github.com/rate_limit" async with session.get(url) as rv: js = await rv.json() return js["rate"]["limit"], js["rate"]["used"], js["rate"]["reset"] async def rate_limit_graphql(self): """Fetches the hourly GraphQL API limit and how many uses have been expended this hour.""" query = """ { rateLimit { limit cost used resetAt } } """ async with aiohttp.ClientSession(headers=self.api_headers) as session: async with session.post(GRAPHQL_URL, json={"query": query}) as rv: js = await rv.json() if not js or "data" not in js: print("Could not grab graphql data, assuming it's broken for now.") # Fake a response that makes us wait a while: # 500 used, out of 500, and it resets in 15 minutes time (time+900 secs) return 500, 500, int(time.time()+900) resets = dateutil.parser.parse(js["data"]["rateLimit"]["resetAt"]).timestamp() return js["data"]["rateLimit"]["limit"], js["data"]["rateLimit"]["used"], resets async def load_teams(self) -> typing.List["GitHubTeam"]: """Loads all GitHub teams in this organization using GraphQL""" query = """ { organization(login: "%s") { teams(first: %s, after:%s) { pageInfo { hasNextPage endCursor } edges { node { name slug databaseId members { totalCount edges { node { login } } } repositories { totalCount edges { node { name } } } } } } } } """ teams = [] async with aiohttp.ClientSession(headers=self.api_headers) as session: next_page = True after = "null" while next_page: async with session.post( GRAPHQL_URL, json={"query": query % (self.login, GITHUB_RESULTS_PER_QUERY, after)} ) as rv: js = await rv.json() for edge in js["data"]["organization"]["teams"]["edges"]: team = GitHubTeam(self, edge) total_members = edge["node"]["members"]["totalCount"] total_repos = edge["node"]["repositories"]["totalCount"] slug = edge["node"]["slug"] if total_members > 100: print( f"{slug} has {total_members} members, need to fill specifically..." ) await team.get_members() print("Filled with %d members!" % len(team.members)) if total_repos > 100: print( f"{slug} has {total_repos} repos assigned, need to fill specifically..." ) await team.get_repositories() print("Filled with %d repos!" % len(team.repos)) teams.append(team) endcursor = js["data"]["organization"]["teams"]["pageInfo"]["endCursor"] next_page = endcursor not in [None, "null"] # GraphQL is broken, look for null value here, do not trust hasNextPage if next_page: after = '"%s"' % endcursor self.teams = teams return teams async def load_repositories(self) -> typing.List[str]: """Loads all GitHub repositories in this organization using GraphQL""" query = """ { organization(login: "%s") { repositories(first: 100, after:%s, orderBy: {field: CREATED_AT, direction: ASC}) { pageInfo { hasNextPage endCursor } edges { node { name databaseId } } } } } """ repos = [] async with aiohttp.ClientSession(headers=self.api_headers) as session: next_page = True after = "null" while next_page: async with session.post( GRAPHQL_URL, json={"query": query % (self.login, after)} ) as rv: js = await rv.json() for edge in js["data"]["organization"]["repositories"]["edges"]: repo = edge['node']['name'] repos.append(repo) endcursor = js["data"]["organization"]["repositories"]["pageInfo"]["endCursor"] next_page = endcursor not in [None, "null"] # GraphQL is broken, look for null value here, do not trust hasNextPage if next_page: after = '"%s"' % endcursor self.repositories = repos return repos async def get_mfa_status(self) -> dict: """Get MFA status for all org members, return as a dict[str, bool] of people and whether 2fa is enabled""" query = """ { organization(login: "%s") { membersWithRole(first: 100, after:%s) { pageInfo { hasNextPage endCursor } edges { hasTwoFactorEnabled node { login } } } } } """ mfa = {} async with aiohttp.ClientSession(headers=self.api_headers) as session: next_page = True after = "null" while next_page: async with session.post( GRAPHQL_URL, json={"query": query % (self.login, after)} ) as rv: js = await rv.json() for user in js["data"]["organization"]["membersWithRole"]["edges"]: login = user["node"]["login"] mfa_status = user["hasTwoFactorEnabled"] mfa[login] = mfa_status next_page = js["data"]["organization"]["membersWithRole"][ "pageInfo" ]["hasNextPage"] after = ( '"%s"' % js["data"]["organization"]["membersWithRole"]["pageInfo"][ "endCursor" ] ) mfa_enabled = len([x for x in mfa.values() if x is True]) mfa_disabled = len([x for x in mfa.values() if x is False]) print("%d with 2FA, %d without!" % (mfa_enabled, mfa_disabled)) return mfa async def add_team(self, project: str, role: str = "committers") -> typing.Optional[int]: """Adds a new GitHub team to the organization""" assert self.orgid, "Parent GitHubOrganization needs a call to .get_id() prior to membership updates!" assert project, "GitHub team needs a name to be added to the organization" url = f"https://api.github.com/orgs/{self.login}/teams" data = { "name": f"{project} {role}", } txt = await self.api_post(url, jsdata=data) if txt: js = json.loads(txt) assert 'id' in js and js['id'], \ "GitHub did not respond with a Team ID for the new team, something wrong?: \n" + txt print(f"Team '{project} {role}' with ID {js['id']} created.") return js['id'] else: raise AssertionError("Github did not respond with a JSON payload!!") async def setup_teams(self, projects: typing.Dict[str, plugins.projects.Project]): """Looks for and sets up missing teams on GitHub""" for project in projects.values(): # Check if public team needs to be made committer_team = f"{project.name} committers" if project.public_repos and committer_team not in self.teams: print(f"Team '{project.name} committers' was not found on GitHub, setting it up for the first time.") teamid = await self.add_team(project.name, "committers") nodedata = { "node": { "databaseId": teamid, "slug": committer_team.replace(" ", "-"), "name": committer_team, "members": { "edges": [] }, "repositories": { "edges": [] }, }, } newteam = GitHubTeam(self, nodedata) self.teams.append(newteam) # Check if private (pmc) team needs to be made private_team = f"{project.name} private" if project.private_repos and private_team not in self.teams: print( f"Team '{project.name} private' was not found on GitHub, setting it up for the first time.") teamid = await self.add_team(project.name, "private") nodedata = { "node": { "databaseId": teamid, "slug": private_team.replace(" ", "-"), "name": private_team, "members": { "edges": [] }, "repositories": { "edges": [] }, }, } newteam = GitHubTeam(self, nodedata) self.teams.append(newteam) class GitHubTeam: org: GitHubOrganisation id: int slug: str name: str project: str type: str members: list repos: list def __init__(self, org: GitHubOrganisation, nodedata): self.org = org self.id = nodedata["node"]["databaseId"] self.slug = nodedata["node"]["slug"] self.name = nodedata["node"]["name"].lower() if " " in self.name: self.project, self.type = self.name.lower().split( " ", 1 ) # empire-db committers -> "empire-db" + "committers" else: self.project = "root" self.type = "admin" self.members = [] self.repos = [] for member in nodedata["node"]["members"]["edges"]: self.members.append(member["node"]["login"]) for repo in nodedata["node"]["repositories"]["edges"]: if repo: self.repos.append(repo["node"]["name"]) def __eq__(self, other): if isinstance(other, str): return other == self.name if not isinstance(other, GitHubTeam): return False return self.name == other.slug def __hash__(self): return self.name.lower() async def get_members(self): """Fetches all members of a team using GraphQL""" query = """ { organization (login: "%s") { team(slug: "%s") { members(first: 100, after:%s) { totalCount pageInfo { hasNextPage endCursor } edges { node { login } } } } } } """ async with aiohttp.ClientSession(headers=self.org.api_headers) as session: next_page = True after = "null" while next_page: async with session.post( GRAPHQL_URL, json={"query": query % (self.org.login, self.slug, after)}, ) as rv: js = await rv.json() for edge in js["data"]["organization"]["team"]["members"]["edges"]: login = edge["node"]["login"] if login not in self.members: self.members.append(login) next_page = js["data"]["organization"]["team"]["members"][ "pageInfo" ]["hasNextPage"] after = ( '"%s"' % js["data"]["organization"]["team"]["members"]["pageInfo"][ "endCursor" ] ) async def get_repositories(self): """Fetches all repositories belonging to a GitHub Team using GraphQL""" query = """ { organization (login: "%s") { team(slug: "%s") { repositories(first: 100, after:%s) { totalCount pageInfo { hasNextPage endCursor } edges { node { name } } } } } } """ async with aiohttp.ClientSession(headers=self.org.api_headers) as session: next_page = True after = "null" while next_page: async with session.post( GRAPHQL_URL, json={"query": query % (self.org.login, self.slug, after)}, ) as rv: js = await rv.json() for edge in js["data"]["organization"]["team"]["repositories"][ "edges" ]: if edge: reponame = edge["node"]["name"] if reponame not in self.repos: self.repos.append(reponame) next_page = js["data"]["organization"]["team"]["repositories"][ "pageInfo" ]["hasNextPage"] after = ( '"%s"' % js["data"]["organization"]["team"]["repositories"][ "pageInfo" ]["endCursor"] ) async def set_membership( self, github_ids: typing.List[str], ignore_prefix: str = "asf-ci" ) -> typing.Tuple[list, list]: """Adjusts GitHub teams according to the list provided, removing people that are a member of the team but not on the provided list, and adding those missing from the team :param github_ids: :param ignore_prefix: :return: A tuple of github IDs added and removed""" github_team = set([x for x in self.members if not x.startswith(ignore_prefix)]) asf_team = set([x for x in github_ids if not x.startswith(ignore_prefix)]) to_remove = github_team - asf_team to_add = asf_team - github_team # If we are okay, and we have people to add/remove, do so. for gh_person in to_add: await self.add_member(gh_person) for gh_person in to_remove: await self.remove_member(gh_person) return list(to_add), list(to_remove) async def add_member(self, github_id: str): """Adds a new person to this github team""" assert ( self.org.orgid ), "Parent GitHubOrganization needs a call to .get_id() prior to membership updates!" url = f"https://api.github.com/organizations/{self.org.orgid}/team/{self.id}/memberships/{github_id}" await self.org.api_put(url) async def remove_member(self, github_id: str): """Removes a new person from this github team""" assert ( self.org.orgid ), "Parent GitHubOrganization needs a call to .get_id() prior to membership updates!" url = f"https://api.github.com/organizations/{self.org.orgid}/team/{self.id}/memberships/{github_id}" await self.org.api_delete(url) async def set_repositories(self, repositories: typing.List[str]) -> typing.Tuple[list, list]: """Assigns a list of repositories to a team. Returns a tuple of repos added and removed from the team""" repos_assigned = set(self.repos) repos_to_assign = set(repositories) to_add = repos_to_assign - repos_assigned # add: all repos belonging to project, minus already assigned to_remove = repos_assigned - repos_to_assign # remove: already assigned, minus all repos belonging to project for repo in to_add: await self.add_repository(repo) for repo in to_remove: await self.remove_repository(repo) return list(to_add), list(to_remove) async def remove_repository(self, reponame: str): """Removes a single repository from the team""" assert ( self.org.orgid ), "Parent GitHubOrganization needs a call to .get_id() prior to membership updates!" url = f"https://api.github.com/organizations/{self.org.orgid}/team/{self.id}/repos/{self.org.login}/{reponame}" await self.org.api_delete(url) async def add_repository(self, reponame: str): """Adds a single repository to the team with the custom 'committers' role (write access plus can create protected tags)""" assert ( self.org.orgid ), "Parent GitHubOrganization needs a call to .get_id() prior to membership updates!" url = f"https://api.github.com/organizations/{self.org.orgid}/team/{self.id}/repos/{self.org.login}/{reponame}" await self.org.api_put(url, {'permission': 'committers'})