converter.py (216 lines of code) (raw):

#!/usr/bin/python3 import os import yaml import requests import sys import re import logging import argparse import datetime DEFAULT_EXPIRATION_DATE = datetime.datetime(2050, 1, 1).date() def get_args(): parser = argparse.ArgumentParser("GitHub Actions Approved Patterns Converter") parser.add_argument('--ghtoken', required=True, help="GitHub Token") parser.add_argument('--dhtoken', required=True, help="DockerHub Token") parser.add_argument('-f', '--filename', default="approved_patterns.yml", help="DockerHub Token") parser.add_argument('-v','--verbose', action='count', default=0, help="Verbosity") args = parser.parse_args() return args class Log: def __init__(self, config): self.config = config self.log = logging.getLogger(__name__) self.verbosity = { 0: logging.INFO, 1: logging.CRITICAL, 2: logging.ERROR, 3: logging.WARNING, 4: logging.INFO, 5: logging.DEBUG, } self.stdout_fmt = logging.Formatter( "{asctime} [{levelname}] {funcName}: {message}", style="{" ) if self.config["logfile"] == "stdout": self.to_stdout = logging.StreamHandler(sys.stdout) self.to_stdout.setLevel(self.verbosity[self.config["verbosity"]]) self.to_stdout.setFormatter(self.stdout_fmt) self.log.setLevel(self.verbosity[self.config["verbosity"]]) self.log.addHandler(self.to_stdout) else: self.log.setLevel(self.verbosity[self.config["verbosity"]]) logging.basicConfig( format="%(asctime)s [%(levelname)s] %(funcName)s: %(message)s", filename=self.config["logfile"], ) class Converter: # Handles Converting the allowed_patterns list to an actions.yml that can be consumed by our workflow def __init__(self, args): self.allowlist = {} self.logger = Log( { "logfile": "stdout", "verbosity": args.verbose, } ) # GitHub Session Handler self.gh = requests.Session() self.gh.headers.update( { "Accept": "application/vnd.github+json", "Authorization": f"Bearer {args.ghtoken}", "X-GitHub-Api-Version": "2022-11-28", } ) # DockerHub Session Handler self.dh = requests.Session() self.dh.headers.update( { "Authorization": f"Bearer {args.dhtoken}", "Accept": "application/json", } ) def gh_fetch(self, uri): self.logger.log.debug(f"Fetching {uri}...") try: data = self.gh.get(uri) self.logger.log.debug(data) data.raise_for_status() except: self.logger.log.error(f"{uri} Returned 404!") return None data = yaml.safe_load(data.content.decode("utf-8")) if isinstance(data, list): return data else: print(data["status"]) def dh_fetch(self, uri): self.logger.log.debug(f"Fetching {uri}...") try: data = self.dh.get(uri) data.raise_for_status() except: self.logger.log.error(f"{uri} Failed!!!") return None data = yaml.safe_load(data.content.decode("utf-8"))["results"] if isinstance(data, list): return data else: print(data) def build_dh_action(self, action, tag): self.logger.log.info(f"Fetching tags for Dockerhub://{action}") dh_uri = f"https://registry.hub.docker.com/v2/repositories/" tags = self.dh_fetch(f"{dh_uri}/{action}/tags/?page_size=100") t = {} if "*" in tag: # set to the newest tagged image self.logger.log.info("Pinning DockerHub Image to newest Tagged Image") sha = max(tags, key=lambda x: x["name"])["digest"] else: self.logger.log.info(f"Ensuring {tag} is a valid tag") tl = [t["sha"] for t in tags if t["name"] == tag] if len(tl) == 0: sha = None self.logger.log.error( f"Tag: {tag} is not found in {action} tags! Skipping..." ) else: sha = tl[0] if sha: t[sha] = {"expires_at": DEFAULT_EXPIRATION_DATE} else: return None return t def build_gh_action(self, action, tag): self.logger.log.info(f"Fetching Details on {action}") gh_uri = f"https://api.github.com/repos/{action}" tags = self.gh_fetch(f"{gh_uri}/git/refs/tags") heads = self.gh_fetch(f"{gh_uri}/git/refs/heads") if tags and heads: nick = None t = {} self.logger.log.info(f"Parsing: {action}@{tag}") if "*" in tag: if len(tag.split()) > 1: self.logger.log.info("Keeping globs around for now...") sha = tag # We need to keep the globs, don't set to HEAD else: # if globbed, set to hash of HEAD. # self.logger.log.info("Pinning to the SHA of the current HEAD") self.logger.log.info("Keeping globs around for now...") #sha = max(tags, key=lambda x: x["ref"])["object"]["sha"] sha = "*" elif tag == "latest": # set to the hash of 'refs/heads/latest' self.logger.log.info("Pinning to the SHA of refs/heads/latest") sha = [item for item in heads if item["ref"] == "refs/heads/latest"][0][ "object" ]["sha"] elif len(tag) == 40: # Lets pretend for now that any 40 character string is a SHA # TODO Validate that the 40 character string is a valid SHA self.logger.log.critical( "Pretending that any 40 character string is a SHA..." ) sha = tag else: # Check if the provided tag is valid, if so use it. nick = tag self.logger.log.info(f"Pinning to the SHA of refs/heads/{tag}") sha = next( ( item["object"]["sha"] for item in tags if item["ref"] == f"refs/tags/{tag}" ), None, ) if sha is None: self.logger.log.error( f"Tag: {tag} not found in https://api.github.com/repos/{action}/git/refs/tags" ) self.logger.log.error(f"Skipping {action}@{tag}") return None # Expiration Date set as a GLOBAL t[sha] = {"expires_at": f"{DEFAULT_EXPIRATION_DATE}"} # TODO Don't keep globs # Keep the globs for 6 months if sha == "*": t[sha]["keep"] = True t[sha]["expires_at"] = DEFAULT_EXPIRATION_DATE # TODO Don't keep tags # Keep tags as nicknames for their associated version for 6 months if nick: t[nick] = {"expires_at": DEFAULT_EXPIRATION_DATE} return t def parse_approved_patterns(self, file): allowlist = {} allowed = yaml.safe_load(file) for ap in allowed: # Do some work to make sure that the names are right first, _then_ parse the tags a = ap.split("/") # Parse Docker things first if a[0] == "docker:": # action = self.build_dh_action(ap) self.logger.log.critical("Parsing DockerHub entry") dkey, image, tag = ap.split(":") act = image.lstrip("//") action = self.build_dh_action(act, tag) # reset the action name to include the docker key `docker://` act = "://".join([dkey, act]) # If it's not Docker it's GitHub else: # %s/%s if len(a) == 2: org = a[0] if a[1] != "*": repo = a[1] else: self.logger.log.critical( f"Invalid Entry (No repo provided): {ap}" ) continue # %s, should not happen elif len(a) == 1: print(a) # %s/%s/%s trunc'd to %s/%s elif len(a) >= 3: org = a[0] repo = a[1] act = f"{org}/{repo}" if "@" in act: act, tag = act.split("@") else: # In this case * is equivalent to HEAD of the default branch tag = "*" action = self.build_gh_action(act, tag) if action: # Update the allowlist if act in self.allowlist: allowlist[act].update(action) else: allowlist[act] = action return allowlist if __name__ == "__main__": args = get_args() c = Converter(args) c.logger.log.info("Parsing {FILENAME}") converted = c.parse_approved_patterns(open(args.filename)) c.logger.log.info("Printing Generated actions.yml to file") with open("actions.yaml", "w+") as f: yaml.safe_dump(converted, f, default_flow_style=False) f.close() c.logger.log.info("Done!")