in tools/import-mbox.py [0:0]
def run(self):
global goodies, baddies, dedupped # pylint: disable=global-statement # TODO:FIX
ja = []
jas = []
self.printid("Thread started")
mla = None
ml = ""
mboxfile = ""
filename = ""
archie = archiver.Archiver(
generator=args.generator, parse_html=args.html2text, ignore_body=args.ibody, verbose=args.verbose
)
while len(lists) > 0:
self.printid("%u elements left to slurp" % len(lists))
block.acquire()
try:
mla = lists.pop(0)
if not mla:
self.printid("Nothing more to do here")
return
except Exception as err:
self.printid("Could not pop list: %s" % err)
return
finally:
block.release()
stime = time.time()
tmpname = ""
delete_file = False
useMboxo = False
if imap:
imap4 = mla[2]
tmpname = "IMAP"
def mailgen(_list):
for uid in _list:
msgbytes = imap4.uid("fetch", uid, "(RFC822)")[1][0][1]
yield archiver.parse_message(msgbytes)
messages = mailgen(mla[0])
elif filebased:
tmpname = mla[0]
filename = mla[0]
if filename.find(".gz") != -1:
self.printid("Decompressing %s..." % filename)
try:
with open(filename, "rb") as bf:
bmd = bf.read()
bf.close() # explicit early close
bmd = gzip.decompress(bmd)
tmpfile = tempfile.NamedTemporaryFile(
mode="w+b", delete=False
)
tmpfile.write(bmd)
tmpfile.flush()
tmpfile.close()
tmpname = tmpfile.name
delete_file = True # Slated for deletion upon having been read
self.printid("%s -> %u bytes" % (tmpname, len(bmd)))
except Exception as err:
self.printid("This wasn't a gzip file: %s" % err)
self.printid("Slurping %s" % filename)
if maildir:
messages = mailbox.Maildir(tmpname, create=False)
else:
useMboxo = (not noMboxo)
messages = mailbox.mbox(
# pylint: disable-next=possibly-used-before-assignment
tmpname, None if noMboxo else MboxoFactory, create=False
)
else:
ml = mla[0]
mboxfile = mla[1]
self.printid("Slurping %s/%s" % (ml, mboxfile))
ctx = urlopen("%s%s/%s" % (source, ml, mboxfile))
inp = ctx.read().decode(
ctx.headers.get_content_charset() or "utf-8", errors="ignore"
)
tmpname = hashlib.sha224(
(
"%f-%f-%s-%s.mbox"
% (random.random(), time.time(), ml, mboxfile)
).encode("utf-8")
).hexdigest()
with open(tmpname, "w") as f:
f.write(inp)
if maildir:
messages = mailbox.Maildir(tmpname, create=False)
else:
messages = mailbox.mbox(
tmpname, None if noMboxo else MboxoFactory, create=False
)
count = 0
bad = 0
for key in messages.iterkeys():
if maildir:
file = messages.get_file(key)
else: # must be mbox, which has an extra from_ parameter
file = messages.get_file(key, True) # pylint: disable=too-many-function-args
# If the parsed data is filtered, also need to filter the raw input
# so the source agrees with the summary info
if useMboxo:
file = MboxoReader(file) # pylint: disable=possibly-used-before-assignment
message_raw = file.read()
file.close()
message = archiver.parse_message(message_raw)
if not message:
self.printid("Message %u could not be extracted from %s, ignoring it" % (key, tmpname))
continue
# If --filter is set, discard any messages not matching by continuing to next email
if (
fromFilter
and "from" in message
and message["from"].find(fromFilter) == -1
):
continue
if resendTo:
self.printid(
"Delivering message %s via MTA" % message["message-id"]
if "message-id" in message
else "??"
)
s = SMTP("localhost") # pylint: disable=possibly-used-before-assignment
try:
if list_override:
message.replace_header("List-ID", list_override)
message.replace_header("To", resendTo)
except Exception: # TODO: narrow exception
if list_override:
message["List-ID"] = list_override
message["cc"] = None
s.send_message(message, from_addr=None, to_addrs=(resendTo))
continue
if (
time.time() - stime > timeout
): # break out after N seconds, it shouldn't take this long..!
self.printid(
"Whoa, this is taking way too long, ignoring %s for now"
% tmpname
)
break
# Don't pass message to archiver unless we have a list id
if not (list_override or message["list-id"]):
self.printid("No list id found for %s " % message["message-id"])
bad += 1
continue
# If fetched from Pipermail, we have to revert/reconstruct From: headers sometimes,
# before we can pass it off to final archiving.
if args.pipermail and " at " in str(message.get("from")):
m = re.match(r"^(\S+) at (\S+) \((.+)\)$", str(message["from"]))
if m:
message.replace_header("from", "%s <%s@%s>" % (m.group(3), m.group(1), m.group(2)))
json, contents, _msgdata, _irt, skipit = archie.compute_updates(
list_override, private, message, message_raw
)
if skipit:
continue
# Not sure this can ever happen
if json and not (json["list"] and json["list_raw"]):
self.printid("No list id found for %s " % json["message-id"])
bad += 1
continue
# If --dedup is active, try to filter out any messages that already exist on the list
if json and dedup and message.get("message-id", None):
res = es.search( # pylint: disable=possibly-used-before-assignment
index=es.db_mbox,
doc_type="_doc",
size=1,
_source=["mid"], # so can report the match source
body={
"query": {
"bool": {
"must": [
{
"term": {
"message-id": message.get(
"message-id", None
)
}
},
{"term": {"list_raw": json["list"]}},
]
}
}
},
)
if res and res["hits"]["total"]["value"] > 0:
self.printid(
"Dedupping %s - matched in %s"
% (
json["message-id"],
res["hits"]["hits"][0]["_source"]["mid"],
)
)
dedupped += 1
continue
if json:
if args.dups:
try:
duplicates[json["mid"]].append(
json["message-id"] + " in " + filename
)
except Exception: # TODO: narrow exception
duplicates[json["mid"]] = [
json["message-id"] + " in " + filename
]
# Mark that we imported this email
json["_notes"] = [x for x in json["_notes"] if "ARCHIVE:" not in x] # Pop archiver.py note
json["_notes"].append(["IMPORT: Email imported as %s at %u" % (json["mid"], time.time())])
try: # temporary hack to try and find an encoding issue
# needs to be replaced by proper exception handling
json_source = {
"mid": json["dbid"], # this is only needed for bulk_insert to set up the _id
"message-id": json["message-id"],
"source": archiver.mbox_source(message_raw),
}
except Exception as e:
self.printid(
"Error '%s' processing id %s msg %s "
% (e, json["mid"], json["message-id"])
)
bad += 1
continue
count += 1
if args.verbose and verbose_logger:
# TODO optionally show other fields (e.g. From_ line)
verbose_logger.info("MID:%(mid)s DBID: %(dbid)s MSGID:%(message-id)s", json)
# Nothing more to do if dry run
if args.dry:
if dumpfile:
import json as JSON
# drop fields with timestamps
del(json['_notes'])
del(json['_archived_at'])
JSON.dump(json, dumpfile, indent=2, sort_keys=True, ensure_ascii=False)
dumpfile.write(",\n")
continue
ja.append(json)
jas.append(json_source)
if contents:
for key in contents:
es.index(
index=es.db_attachment,
doc_type="_doc",
id=key,
body={"source": contents[key]},
)
if len(ja) >= 40:
bulk_insert_both(self.name, ja, jas, es)
ja = []
jas = []
else:
self.printid(
"Failed to parse: Return=%s Message-Id=%s"
% (message.get("Return-Path"), message.get("Message-Id"))
)
bad += 1
if filebased:
self.printid(
"Parsed %u records (failed: %u) from %s" % (count, bad, filename)
)
if delete_file:
os.unlink(tmpname)
elif imap:
self.printid("Parsed %u records (failed: %u) from imap" % (count, bad))
else:
self.printid(
"Parsed %s/%s: %u records (failed: %u) from %s"
% (ml, mboxfile, count, bad, tmpname)
)
os.unlink(tmpname)
goodies += count
baddies += bad
if len(ja) > 0 and not args.dry:
bulk_insert_both(self.name, ja, jas, es)
ja = []
jas = []
self.printid("Done, %u elements left to slurp" % len(lists))