in gha_scanner/__init__.py [0:0]
def handler(self, data):
if "commit" in data:
reponame = data["commit"]["project"].split("-")
self.logger.log.debug(reponame)
proj_name = None
proj_mail = None
if reponame[0] == "incubator":
try:
proj_mail = f"private@{reponame[1]}.apache.org"
proj_name = reponame[1]
except IndexError:
proj_mail = "private@incubator.apache.org"
proj_name = "Incubator"
else:
try:
proj_mail = f"private@{self.mail_map[reponame[0]]}.apache.org"
proj_name = self.mail_map[reponame[0]]
except KeyError:
proj_mail = "root@apache.org"
proj_name = "Foundation"
self.logger.log.debug(f"Divined project email: {proj_mail}")
message = {
"body": [
f"Greetings {proj_name.capitalize()} PMC!\n",
f"The repository: {data['commit']['project']} has been scanned.",
"Our analysis has found that the following GitHub Actions workflows need remediation:",
],
# "recips": ["notifications@infra.apache.org", proj_mail],
"recips": ["notifications@infra.apache.org"],
"subject": f"GitHub Actions workflow policy violations in {data['commit']['project']}",
}
p = re.compile(r"^\.github\/workflows\/.+\.ya?ml$")
results = {}
if not self.config["full_scan"]:
r = [w for w in data["commit"].get("files", []) if p.match(w)]
self.logger.log.debug("found %s modified workflow files" % len(r))
self.logger.log.debug(f"{data['commit'].get('files', [])}")
else:
r = [True]
self.logger.log.debug("Full scan enabled: scanning all workflow files")
self.logger.log.debug(f"{data['commit'].get('files', [])}")
if len(r) > 0:
w_list = self.list_flows(data["commit"])
self.logger.log.debug(f"{w_list}")
if "workflows" in w_list.keys() and w_list["workflows"] is not None:
self.logger.log.debug(
[item["path"] for item in w_list["workflows"]]
)
for workflow in w_list["workflows"]:
# Handle the odd ''
if not workflow["path"]:
self.logger.log.debug(workflow)
continue
self.logger.log.debug("Handling: %s" % workflow["path"])
results[workflow["name"]], m = self.scan_flow(
data["commit"], workflow
)
if m:
message["body"].extend(m)
else:
self.logger.log.debug(
f"{workflow['path']} Passed all tests."
)
else:
self.logger.log.info(
f"No workflows found in {data['commit']['project']}: {data['commit']}"
)
else:
self.logger.log.info(
f"Scanned {data['commit']['project']} commit: {data['commit']['hash']}"
)
if len(message["body"]) >= 4:
self.logger.log.info(
f"Failures detected, generating message to {proj_name}..."
)
message["body"].extend(self.message_foot)
self.logger.log.debug(message["subject"])
self.send_report(message, proj_name)
else:
self.logger.log.debug(results)
else:
self.logger.log.info("Heartbeat Signal Detected")