crashclouseau/hgauthors.py (339 lines of code) (raw):
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import json
import re
from .logger import logger
import unicodedata
from validate_email import validate_email
PATS = [
(
re.compile(r"^([\w\t ’\'\.\-]+)\[?<+([^@>]+@[^>]+)>?$", re.UNICODE),
[2, 1, 0],
), # foo bar <...@...>
(
re.compile(r"^\"([\w\t ’\'\.\-]+)\"[\t ]*\[?<+([^@>]+@[^>]+)>?$", re.UNICODE),
[2, 1, 0],
), # "foo bar" <...@...>
(re.compile(r"^<([^@>]+@[^>]+)>?$", re.UNICODE), [1, 0, 0]), # <...@...>
(
re.compile(
r"^([\w\t ’\'\.\-]+)[\[\(]:?([^\)\]]+)[\]\)][\"\t ]*[\(<]([^@>]+@[^>]+)[\)>]?$",
re.UNICODE,
),
[3, 1, 2],
), # foo bar (:toto) <...@...>
(
re.compile(r"^([\w\t ’\'\.\-]+)\(([^@\)>]+@[^\)>]+)[\)>]?$", re.UNICODE),
[2, 1, 0],
), # foo bar (...@...)
(re.compile(r"^([^@\t ]+@[^\t ]+)$", re.UNICODE), [1, 0, 0]), # ...@...
(
re.compile(r"^([\w\t ’\'\.\-]+)<([\w\t \.\+\-]+)>$", re.UNICODE),
[0, 1, 2],
), # foo bar <toto>
(re.compile(r"^<([\w\t ’\'\.\+]+)>$", re.UNICODE), [0, 0, 1]), # <toto>
(
re.compile(r"^<([\w\t ’\'\.\+]+)>[\t ]*([^@\t ]+@[^\t ]+)$", re.UNICODE),
[2, 1, 0],
), # <toto> ...@...
(re.compile(r"^([\w\t ’\'\.\-]+)$", re.UNICODE), [0, 1, 0]), # foo bar
(
re.compile(r"^((?:[\w’\'\.\-]+[\t ]+)+)([^@\t >]+@[^\t >]+)>?$", re.UNICODE),
[2, 1, 0],
), # foo bar toto@titi
(
re.compile(r"^([\w\t ’\'\.\-]+)[\[\(]:?([^\)]+)[\]\)]$", re.UNICODE),
[0, 1, 2],
), # foo bar (:toto)
(
re.compile(r"^([\w\t ’\'\.\-]+):([\w_]+)$", re.UNICODE),
[0, 1, 2],
), # foo bar :toto
(
re.compile(
r"^([\w\t ’\'\.\-]+):([^\t ]+)[\t ]*[\(<]([^@>]+@[^>]+)[\)>]?$", re.UNICODE
),
[3, 1, 2],
), # foo bar :toto <...@...>
(
re.compile(r"^([^\t @]+@[^\t ]+)[\t ]*<([^@>]+@[^>]+)>?$", re.UNICODE),
[2, 0, 0],
), # ...@... <...@...>
(
re.compile(r"^([^\t @]+@[^\t ]+)[\t ]*<([\w\t \.\+]+)>?$", re.UNICODE),
[1, 0, 2],
), # ...@... <toto>
(
re.compile(
r"^[\[\(]:?([^\)]+)[\]\)][\"\t ]*[\(<]([^@>]+@[^>]+)[\)>]?$", re.UNICODE
),
[2, 0, 1],
), # (:toto) <...@...>
(
re.compile(r"^([\w\t ’\'\.\-\\]+)<([^@>]+@[^>]+)>?$", re.UNICODE),
[2, 1, 0],
), # foo \"bar\" <...@...>
(
re.compile(r"^([\w’\'\.\-\+]+)[\t ]*<([^@>]+@[^>]+)>?$", re.UNICODE),
[2, 1, 0],
), # foo-bar.toto <...@...>
(
re.compile(
r"^([\w\t ’\'\.\-]+)\[?<+([^@>]+@[^>]+)>[\t ]*[\w\t \(\)]+$", re.UNICODE
),
[2, 1, 0],
), # foo bar <...@...> (tutu)
(
re.compile(
r"^([\w\t ’\'\.\-]+)[\[\(]:?([^\)\]]+)[\]\)][\t ]*[\[\(]:?([^\)\]]+)[\]\)][\t ]*[\"\t ]*[\(<]([^@>]+@[^>]+)[\)>]?$",
re.UNICODE,
),
[4, 1, 2],
), # foo bar (:toto) (:titi) <...@...>
]
SPLIT_PAT = re.compile(r",|;|(?: \+ )|(?: and )|/|&")
NON_LETTER = re.compile(r"[^a-zA-Z]")
SPECIAL_PAT_1 = re.compile(
r"^(\w+)[\t ]*,[\t ]*(\w+)[\t ]*<([^@]+@[^>]+)>?$", re.UNICODE
)
SPECIAL_PAT_2 = re.compile(r"[\t ]*,[\t ]*")
SPECIAL_PAT_3 = (
re.compile(r"^([\w\t ’\'\.\-]+)<([^@]+@[^>]+)>.*$", re.UNICODE),
[2, 1, 0],
)
SPECIAL_PAT_4 = re.compile(
r"^\"?([\w’\'\.\-\+]+)[\t ]*[\(\"]([^\)\"]+)[\)\"][\t ]*([\w’\'\"\.\-\+]+)\"?[\t ]*<([^@>]+@[^>]+)>?$",
re.UNICODE,
) # foo (bar) toto <...@...>
SPECIAL_PAT_5 = re.compile(
r"^([^>]+)[\t ]*<+([^@>]+@[^>]+)>?$", re.UNICODE
) # ... <...@...>
BUG_PAT = re.compile(r"bug[0-9]+", re.I)
ENCODINGS = ["iso-8859-1", "iso-8859-2"]
def clean_author(author):
"""Remove typos we can have in a author field"""
if author.startswith('"') and author.endswith('"'):
author = author[1:-1]
if author.endswith("."):
author = author[:-1]
if author.endswith(">>"):
author = author[:-1]
if author.startswith("="):
author = author[1:]
author = author.strip()
author = author.replace("%40", "@")
author = author.replace("%gmail", "@gmail")
author = unicodedata.normalize("NFC", author)
return author
def check_pat(pat, positions, author):
"""Check a pattern and return a triple (email, real, nick) according to positions"""
m = pat.match(author)
if m:
return tuple(m.group(p).strip() if p != 0 else "" for p in positions)
return None
def check_common(author):
"""Check for common patterns (as found in PATS)"""
# we check each regex in PATS
for pat, positions in PATS:
r = check_pat(pat, positions, author)
if r:
return r
r = special4(author)
if r:
return r
return None
def check_multiple(author):
"""Check for multiple authors we can have in one author field"""
authors = SPLIT_PAT.split(author)
if len(authors) >= 2 and all("@" in a for a in authors):
res = []
fail = []
for author in authors:
author = author.strip()
r = check_common(author)
if r:
if isinstance(r, list):
res += r
else:
res.append(r)
else:
fail.append(author)
return res, fail
return None, None
def to_ascii_form(s):
"""Remove accent and non-acii chars"""
return unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("utf-8")
def rm_non_letter(s):
"""Remove non-letter chars"""
return NON_LETTER.sub("", s)
def rm_accents(s):
"""Remove accent and punctuation chars"""
r = ""
for c in unicodedata.normalize("NFD", s):
cat = unicodedata.category(c)[0]
# if cat is not Mark, Punctuation
if cat not in {"M", "P"}:
r += c
return r
def cmp_name_email(name, email_name):
"""Compare name and first part of email address"""
name = to_ascii_form(name.lower())
email = rm_non_letter(email_name.lower())
if " " in name:
toks = list(map(rm_non_letter, name.split(" ")))
if "".join(toks) == email:
return True
for tok in toks:
if len(tok) >= 5 and tok in email:
return True
if len(toks) == 2:
toks = toks[::-1]
if "".join(toks) == email:
return True
if toks[0][0] + toks[1] == email:
return True
if toks[1][0] + toks[0] == email:
return True
elif name in email:
return True
return False
def special1(author):
# we've something like foo, bar <foo.bar@toto>
m = SPECIAL_PAT_1.match(author)
if m:
foo, bar, email = m.group(1).lower(), m.group(2).lower(), m.group(3).lower()
if foo in email:
return (email, foo + " " + bar, "")
return None
def special2(author):
# we've something like toto foo, titi bar <toto@...>
toks = author.split(" <")
if len(toks) == 2:
email = toks[1][:-1] if toks[1].endswith(">") else toks[1]
if "@" in email:
before_at = email.split("@")[0]
authors = SPECIAL_PAT_2.split(toks[0])
for a in authors:
if cmp_name_email(a, before_at):
return (email, a, "")
return None
def special3(author):
# we've something like foo, bar <foo.bar@toto>
pat, positions = SPECIAL_PAT_3
r = check_pat(pat, positions, author)
if r:
return r
return None
def special4(author):
m = SPECIAL_PAT_4.match(author)
if m:
return (m.group(4), m.group(1) + " " + m.group(3), m.group(2))
return None
def special5(author):
m = SPECIAL_PAT_5.match(author)
if m:
return (m.group(2), m.group(1), "")
return None
def post_process(r, author):
"""Post process the triple (email, real, nick) to remove typos or stuff like that"""
email, real, nick = r
email = email.strip(" .,")
real = real.strip()
nick = nick.strip()
if email.startswith("mailto:"):
email = email[7:]
elif email.startswith("h<"):
email = email[2:]
email = (
email.replace(",", ".").replace("@.", "@").replace(".@", "@").replace(" ", "")
)
email = BUG_PAT.sub("", email)
if email and not validate_email(email):
logger.error("Email not valid for: {}".format(author))
return (email, real, nick)
def analyze_author_helper(author, first=False):
"""Analyze an author to try to guess the triple (email, realname, nickname)"""
r = check_common(author)
if r:
return r
r, fail = check_multiple(author)
if fail:
logger.error("Failed to parse authors: {}".format(author))
if r:
return r
for special in [special1, special2, special3]:
r = special(author)
if r:
return r
if first:
return None
# maybe we've an encoding issue...
for enc in ENCODINGS:
try:
a = bytes(author, enc).decode("utf-8")
r = analyze_author_helper(a, first=True)
if r:
return r
break
except Exception:
pass
r = special5(author)
if r:
return r
logger.error("Failed to parse author: {}".format(author))
return None
def analyze_author(author, clean=True):
"""Analyze an author to try to guess the triple (email, realname, nickname)"""
if clean:
author = clean_author(author)
r = analyze_author_helper(author)
if r:
if not isinstance(r, list):
return [post_process(r, author)]
else:
return [post_process(x, author) for x in r]
return []
def cmp_name_email1(n, e):
"""Compare name and email to try to find a correspondance between them"""
if e and len(n) == 2:
L = list(filter(None, map(to_ascii_form, n)))
if len(L) == 2:
if e.startswith(".".join(L)) or e.startswith(".".join(L[::-1])):
return True
if e.startswith(L[0][0] + L[1]) or e.startswith(L[1][0] + L[0]):
return True
return False
def equal_author(a, b):
"""Try to guess if two authors are the same"""
ea, ra, na = a
eb, rb, nb = b
if ea and eb:
if ea == eb:
# same email
return True
if (na and eb and eb.startswith(na)) or (nb and ea and ea.startswith(nb)):
return True
if ra and rb:
if ra == rb:
# same real name
return True
names_a = set(map(lambda s: rm_accents(s.lower()), ra.split(" ")))
names_b = set(map(lambda s: rm_accents(s.lower()), rb.split(" ")))
if names_a == names_b:
return True
if len(names_a & names_b) >= 2:
return True
cmp_name_email1(names_a, eb)
cmp_name_email1(names_b, ea)
return False
def gather(authors):
"""Try to gather the same authors in the same bucket"""
res = []
for author in authors:
added = False
for r in res:
if equal_author(r[0], author):
r.append(author)
added = True
break
if not added:
res.append([author])
return res
def collect_authors(path, data=None):
"""Collect authors from a pushlog"""
if not data:
with open(path, "r") as In:
data = json.load(In)
toanalyze = set()
for i in data:
author = i["author"]
if author not in toanalyze:
toanalyze.add(clean_author(author))
res = set(x for author in toanalyze for x in analyze_author(author, clean=False))
return res
def stats(path):
"""Make some stats on authors"""
with open(path, "r") as In:
data = json.load(In)
res = {}
for i in data:
author = i["author"]
if author not in res:
res[author] = {"count": 1, "author": analyze_author(author)}
else:
res[author]["count"] += 1
return res