in tools/archiver.py [0:0]
def archive_message(self, mlist, msg, raw_message=None, dry=False, dump=None, defaultepoch=None, digest=False):
"""Send the message to the archiver.
:param mlist: The IMailingList object.
:param msg: The message object.
:param raw_message: Raw message bytes
:param dry: Whether or not to actually run
:param dump: Optional path for dump on fail
:return (lid, mid)
"""
lid = textlib.normalize_lid(mlist.list_id, strict=True)
if lid is None:
raise ValueError(f"Invalid list id {lid}")
private = False
if hasattr(mlist, "archive_public") and mlist.archive_public is True:
private = False
elif hasattr(mlist, "archive_public") and mlist.archive_public is False:
private = True
elif (
hasattr(mlist, "archive_policy")
and mlist.archive_policy is not ArchivePolicy.public
):
private = True
if raw_message is None:
raw_message = msg.as_bytes()
ojson, contents, msg_metadata, irt, skipit = self.compute_updates(
lid, private, msg, raw_message, defaultepoch
)
if not ojson:
_id = msg.get("message-id") or msg.get("Subject") or msg.get("Date")
raise Exception("Could not parse message %s for %s" % (_id, lid))
if skipit:
print("Skipping archiving of email due to invalid date and default date set to skip")
return lid, "(skipped)"
if digest:
return lid, ojson["mid"]
if dry:
print("**** Dry run, not saving message to database *****")
return lid, ojson["mid"]
if dump:
try:
elastic = Elastic()
except elasticsearch.exceptions.ElasticsearchException as e:
print(e)
print(
"ES connection failed, but dumponfail specified, dumping to %s"
% dump
)
else:
elastic = Elastic()
if config.get("archiver", "threadinfo"):
try:
timeout = int(config.get("archiver", "threadtimeout") or 5)
timeout = str(timeout) + "s"
limit = int(config.get("archiver", "threadparents") or 10)
ojson = add_thread_properties(elastic, ojson, timeout, limit)
except Exception as err:
print("Could not add thread info", err)
if logger:
logger.info("Could not add thread info %s" % (err,))
else:
print("Added thread info successfully", ojson["mid"])
if logger:
logger.info("Added thread info successfully %s" % (ojson["mid"],))
try:
if contents:
for key in contents:
elastic.index(
index=elastic.db_attachment,
id=key,
body={"source": contents[key]},
)
elastic.index(
index=elastic.db_mbox, id=ojson["mid"], body=ojson,
)
elastic.index(
index=elastic.db_source,
id=ojson["dbid"],
body={
"message-id": msg_metadata["message-id"],
"source": mbox_source(raw_message),
},
)
# Write to audit log
try:
auditlog_exists = elastic.indices.exists(index=elastic.db_auditlog)
except elasticsearch.exceptions.AuthorizationException:
auditlog_exists = False
if auditlog_exists:
elastic.index(
index=elastic.db_auditlog,
body={
"date": time.strftime("%Y/%m/%d %H:%M:%S", time.gmtime(time.time())),
"action": "index",
"remote": "internal",
"author": "archiver.py",
"target": ojson["mid"],
"lid": lid,
"log": f"Indexed email {ojson['message-id']} for {lid} as {ojson['mid']}",
}
)
# If we have a dump dir and ES failed, push to dump dir instead as a JSON object
# We'll leave it to another process to pick up the slack.
except Exception as err:
print(err)
if dump:
print(
"Pushing to ES failed, but dumponfail specified, dumping JSON docs"
)
uid = uuid.uuid4()
mbox_path = os.path.join(dump, "%s.json" % uid)
with open(mbox_path, "w") as f:
json.dump(
{
"id": ojson["mid"],
"mbox": ojson,
"mbox_source": {
"id": ojson["dbid"],
"permalink": ojson["mid"],
"message-id": msg_metadata["message-id"],
"source": mbox_source(raw_message),
},
"attachments": contents,
},
f,
indent=2,
)
f.close()
sys.exit(0) # We're exiting here, the rest can't be done without ES
# otherwise fail as before
raise err
if logger:
logger.info("Pony Mail archived message %s successfully", ojson["mid"])
oldrefs = []
# Is this a direct reply to a pony mail email?
if irt != "":
dm = re.search(r"pony-([a-f0-9]+)-([a-f0-9]+)@", irt)
if dm:
cid = dm.group(1)
mid = dm.group(2)
if elastic.exists(index=elastic.db_account, id=cid):
doc = elastic.get(index=elastic.db_account, id=cid)
if doc:
oldrefs.append(cid)
# N.B. no index is supplied, so ES will generate one
elastic.index(
index=elastic.db_notification,
body={
"type": "direct",
"recipient": cid,
"list": lid,
"private": private,
"date": ojson["date"],
"from": msg_metadata["from"],
"to": msg_metadata["to"],
"subject": msg_metadata["subject"],
"message-id": msg_metadata["message-id"],
"in-reply-to": irt,
"epoch": ojson["epoch"],
"mid": mid,
"seen": 0,
},
)
if logger:
logger.info("Notification sent to %s for %s", cid, mid)
# Are there indirect replies to pony emails?
if msg_metadata.get("references"):
for im in re.finditer(
r"pony-([a-f0-9]+)-([a-f0-9]+)@", msg_metadata.get("references")
):
cid = im.group(1)
mid = im.group(2)
# TODO: Fix this to work with pibbles
if elastic.exists(index=elastic.db_mbox, id=cid):
doc = elastic.get(index=elastic.db_mbox, id=cid)
# does the user want to be notified of indirect replies?
if (
doc
and "preferences" in doc["_source"]
and doc["_source"]["preferences"].get("notifications")
== "indirect"
and cid not in oldrefs
):
oldrefs.append(cid)
# N.B. no index mapping is supplied, so ES will generate one
elastic.index(
index=elastic.db_notification,
body={
"type": "indirect",
"recipient": cid,
"list": lid,
"private": private,
"date": ojson["date"],
"from": msg_metadata["from"],
"to": msg_metadata["to"],
"subject": msg_metadata["subject"],
"message-id": msg_metadata["message-id"],
"in-reply-to": irt,
"epoch": ojson["epoch"],
"mid": mid,
"seen": 0,
},
)
if logger:
logger.info("Notification sent to %s for %s", cid, mid)
return lid, ojson["mid"]