gha_scanner/__init__.py (234 lines of code) (raw):
#!/usr/bin/env python3
import re
import sys
import yaml
import asfpy.messaging
import asfpy.pubsub
import argparse
import requests
import logging
import json
import smtplib
import datetime
from . import checks
class Log:
def __init__(self, config):
self.config = config
self.log = logging.getLogger(__name__)
self.verbosity = {
0: logging.INFO,
1: logging.CRITICAL,
2: logging.ERROR,
3: logging.WARNING,
4: logging.INFO,
5: logging.DEBUG,
}
self.stdout_fmt = logging.Formatter(
"{asctime} [{levelname}] {funcName}: {message}", style="{"
)
if self.config["logfile"] == "stdout":
self.to_stdout = logging.StreamHandler(sys.stdout)
self.to_stdout.setLevel(self.verbosity[self.config["verbosity"]])
self.to_stdout.setFormatter(self.stdout_fmt)
self.log.setLevel(self.verbosity[self.config["verbosity"]])
self.log.addHandler(self.to_stdout)
else:
self.log.setLevel(self.verbosity[self.config["verbosity"]])
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(funcName)s: %(message)s",
filename=self.config["logfile"],
)
class Scanner:
# Handles API requests to GitHub
def __init__(self, config):
self.config = config
self.ghurl = "https://api.github.com"
self.s = requests.Session()
self.mail_map = {}
raw_map = self.s.get(
"https://whimsy.apache.org/public/committee-info.json"
).json()["committees"]
[self.mail_map.update({item: raw_map[item]["mail_list"]}) for item in raw_map]
self.s.headers.update({"Authorization": "token %s" % self.config["gha_token"]})
self.pubsub = "https://pubsub.apache.org:2070/git/commit"
self.logger = Log(config)
self.message_foot = [
"\nFor more information on the GitHub Actions workflow policy, visit:",
"\thttps://infra.apache.org/github-actions-policy.html\n",
"Please remediate the above as soon as possible.",
"If after after 60 days these problems are not addressed, we will turn off builds",
"\nCheers,",
"\tASF Infrastructure",
]
d = datetime.datetime.now() + datetime.timedelta(
days=self.config["next_pester"]
)
self.msgcache = {"infrastructure": {"date": d}}
def scan(self):
self.logger.log.info("Connecting to %s" % self.pubsub)
asfpy.pubsub.listen_forever(self.handler, self.pubsub, raw=True)
# Fetch all workflows for the project the given hash
def list_flows(self, commit):
try:
r = self.s.get(
"%s/repos/apache/%s/actions/workflows?ref=%s"
% (self.ghurl, commit["project"], commit["sha"])
)
return r.json()
except requests.exceptions.RequestException as e:
self.logger.log.error(f"An error occurred: {e}")
return {}
# Fetch the yaml workflow from github
def fetch_flow(self, commit, w_data):
try:
rawUrl = "https://raw.githubusercontent.com/apache"
self.logger.log.debug(
"Fetching %s/%s/%s/%s"
% (rawUrl, commit["project"], commit["sha"], w_data["path"])
)
r = self.s.get(
"%s/%s/%s/%s"
% (rawUrl, commit["project"], commit["sha"], w_data["path"])
)
r_content = yaml.safe_load(
"\n".join(
[
line
for line in r.content.decode("utf-8").split("\n")
if not re.match(r"^\s*#", line)
]
)
)
except (KeyError, TypeError, yaml.parser.ParserError) as e:
self.logger.log.critical(e)
return None
self.logger.log.debug(r_content.keys())
if 404 in r_content.keys():
self.logger.log.error(
"%s doesn't exist in %s" % (w_data["path"], commit["hash"])
)
return None
self.logger.log.debug("retrieved: %s" % w_data["path"])
return r_content
def scan_flow(self, commit, w_data):
flow_data = self.fetch_flow(commit, w_data)
result = {}
m = []
if flow_data:
for check in checks.WORKFLOW_CHECKS:
self.logger.log.info(
"Checking %s:%s(%s): %s"
% (commit["project"], w_data["name"], commit["hash"], check)
)
c_data = checks.WORKFLOW_CHECKS[check]["func"](flow_data)
# All workflow checks return a bool, False if the workflow failed.
if not c_data:
m.append(
"\t"
+ w_data["name"]
+ ": "
+ checks.WORKFLOW_CHECKS[check]["desc"]
)
result[check] = c_data
return (result, m)
else:
return (None, None)
def send_report(self, message, proj_name):
# Message should be a dict containing recips, subject, and body. body is expected to be a list of strings
# TODO: Get a message cache working so we don't annoy people
if proj_name in self.msgcache.keys():
# Fetch the next pester date
def_now = datetime.datetime.now()
np = getattr(self.msgcache[proj_name], "date", def_now)
self.logger.log.debug(f"{np}")
if np <= def_now:
self.logger.log.info(
f"Waiting until {self.msgcache[proj_name]['date']}"
)
return
# Set new next pester date
next_pester = datetime.datetime.now() + datetime.timedelta(
days=self.config["next_pester"]
)
self.msgcache[proj_name] = {"date": next_pester}
self.logger.log.info(f"{self.msgcache}")
try:
self.logger.log.info(f"Sending Message to {message['recips'][-1]}")
asfpy.messaging.mail(
recipients=message["recips"],
subject=message["subject"],
message="\n".join(message["body"]),
)
except smtplib.SMTPRecipientsRefused as e:
self.logger.log.error(f"An error has occurred: {e}")
def handler(self, data):
if "commit" in data:
reponame = data["commit"]["project"].split("-")
self.logger.log.debug(reponame)
proj_name = None
proj_mail = None
if reponame[0] == "incubator":
try:
proj_mail = f"private@{reponame[1]}.apache.org"
proj_name = reponame[1]
except IndexError:
proj_mail = "private@incubator.apache.org"
proj_name = "Incubator"
else:
try:
proj_mail = f"private@{self.mail_map[reponame[0]]}.apache.org"
proj_name = self.mail_map[reponame[0]]
except KeyError:
proj_mail = "root@apache.org"
proj_name = "Foundation"
self.logger.log.debug(f"Divined project email: {proj_mail}")
message = {
"body": [
f"Greetings {proj_name.capitalize()} PMC!\n",
f"The repository: {data['commit']['project']} has been scanned.",
"Our analysis has found that the following GitHub Actions workflows need remediation:",
],
# "recips": ["notifications@infra.apache.org", proj_mail],
"recips": ["notifications@infra.apache.org"],
"subject": f"GitHub Actions workflow policy violations in {data['commit']['project']}",
}
p = re.compile(r"^\.github\/workflows\/.+\.ya?ml$")
results = {}
if not self.config["full_scan"]:
r = [w for w in data["commit"].get("files", []) if p.match(w)]
self.logger.log.debug("found %s modified workflow files" % len(r))
self.logger.log.debug(f"{data['commit'].get('files', [])}")
else:
r = [True]
self.logger.log.debug("Full scan enabled: scanning all workflow files")
self.logger.log.debug(f"{data['commit'].get('files', [])}")
if len(r) > 0:
w_list = self.list_flows(data["commit"])
self.logger.log.debug(f"{w_list}")
if "workflows" in w_list.keys() and w_list["workflows"] is not None:
self.logger.log.debug(
[item["path"] for item in w_list["workflows"]]
)
for workflow in w_list["workflows"]:
# Handle the odd ''
if not workflow["path"]:
self.logger.log.debug(workflow)
continue
self.logger.log.debug("Handling: %s" % workflow["path"])
results[workflow["name"]], m = self.scan_flow(
data["commit"], workflow
)
if m:
message["body"].extend(m)
else:
self.logger.log.debug(
f"{workflow['path']} Passed all tests."
)
else:
self.logger.log.info(
f"No workflows found in {data['commit']['project']}: {data['commit']}"
)
else:
self.logger.log.info(
f"Scanned {data['commit']['project']} commit: {data['commit']['hash']}"
)
if len(message["body"]) >= 4:
self.logger.log.info(
f"Failures detected, generating message to {proj_name}..."
)
message["body"].extend(self.message_foot)
self.logger.log.debug(message["subject"])
self.send_report(message, proj_name)
else:
self.logger.log.debug(results)
else:
self.logger.log.info("Heartbeat Signal Detected")