gha-allowlist-manager.py (121 lines of code) (raw):
#!/usr/bin/env python3
import re
import sys
import yaml
import asfpy.messaging
import asfpy.pubsub
import argparse
import requests
import logging
import json
import smtplib
ORG = "apache"
PUBLIC_INTERFACE = "infrastructure-actions"
APPROVED_PATTERNS_FILEPATH = "approved_patterns.yml"
GITHUB_OWNED_ALLOWED = True
VERIFIED_ALLOWED = True
github_timewait = 60
class Log:
def __init__(self, config):
self.config = config
self.log = logging.getLogger(__name__)
self.verbosity = {
0: logging.INFO,
1: logging.CRITICAL,
2: logging.ERROR,
3: logging.WARNING,
4: logging.INFO,
5: logging.DEBUG,
}
self.stdout_fmt = logging.Formatter(
"{asctime} [{levelname}] {funcName}: {message}", style="{"
)
if self.config["logfile"] == "stdout":
self.to_stdout = logging.StreamHandler(sys.stdout)
self.to_stdout.setLevel(self.verbosity[self.config["verbosity"]])
self.to_stdout.setFormatter(self.stdout_fmt)
self.log.setLevel(self.verbosity[self.config["verbosity"]])
self.log.addHandler(self.to_stdout)
else:
self.log.setLevel(self.verbosity[self.config["verbosity"]])
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(funcName)s: %(message)s",
filename=self.config["logfile"],
)
class AllowlistUpdater:
"""Scans pubsub for changes to a defined allowlist, and Handles the API requests to GitHub"""
def __init__(self, config):
self.config = config
self.logger = Log(config)
self.action_url = (
f"https://api.github.com/orgs/{ORG}/actions/permissions/selected-actions"
)
self.raw_url = f"https://raw.githubusercontent.com/{ORG}/{PUBLIC_INTERFACE}/refs/heads/main/{APPROVED_PATTERNS_FILEPATH}"
self.s = requests.Session()
# Fetch the mail map
self.logger.log.info("Building mail alias map")
self.mail_map = {}
raw_map = self.s.get(
"https://whimsy.apache.org/public/committee-info.json"
).json()["committees"]
[self.mail_map.update({item: raw_map[item]["mail_list"]}) for item in raw_map]
# Add the GitHub Headers
self.s.headers.update(
{
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {self.config['gha_token']}",
"X-GitHub-Api-Version": "2022-11-28",
}
)
self.pubsub = f"https://pubsub.apache.org:2070/git/{PUBLIC_INTERFACE}"
def scan(self):
self.logger.log.info("Connecting to %s" % self.pubsub)
asfpy.pubsub.listen_forever(self.handler, self.pubsub, raw=True)
def update(self, wlist):
"""Update the GitHub actions allowlist for the org"""
self.logger.log.debug(wlist)
data = {
"github_owned_allowed": GITHUB_OWNED_ALLOWED,
"verified_allowed": VERIFIED_ALLOWED,
"patterns_allowed": wlist,
}
r = self.s.put(f"{self.action_url}", data=json.dumps(data))
if r.status_code == 204:
self.logger.log.info("Updated the global approved patterns list.")
else:
self.logger.log.error(f"Request returned: {r.status_code}")
self.logger.log.error("There was a failure to update the GH Org")
def handler(self, data):
if "commit" in data:
if data["commit"]["project"] == PUBLIC_INTERFACE:
self.logger.log.debug(data)
# Check if modified files are in path
# p = re.compile(r"^{}$".format(APPROVED_PATTERNS_FILEPATH))
# results = [w for w in data["commit"].get("files", []) if w == APPROVED_PATTERNS_FILEPATH)]
if APPROVED_PATTERNS_FILEPATH in data["commit"].get("files", []):
self.logger.log.debug("Updated allowlist detected")
wlist = yaml.safe_load(self.s.get(self.raw_url).content.decode("utf-8"))
self.update(wlist)
else:
self.logger.log.info("Heartbeat Signal Detected")
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"-c",
"--config",
help="Configuration file",
default="gha-allowlist-manager.yaml",
)
parser.add_argument(
"--force-update",
help="Configuration file",
action="store_true",
default="False",
)
args = parser.parse_args()
setattr(args, "uri", "orgs/apache/actions/permissions/selected-actions")
return args
if __name__ == "__main__":
args = get_args()
config = yaml.safe_load(open(args.config, "r").read())
w = AllowlistUpdater(config)
if args.force_update is True:
w.logger.log.info(
f"Fetching approved patterns from: {PUBLIC_INTERFACE}/{APPROVED_PATTERNS_FILEPATH} "
)
wlist = yaml.safe_load(w.s.get(w.raw_url).content.decode("utf-8"))
w.update(wlist)
else:
w.scan()