def archive_message()

in tools/archiver.py [0:0]


    def archive_message(self, mlist, msg, raw_message=None, dry=False, dump=None, defaultepoch=None, digest=False):
        """Send the message to the archiver.

        :param mlist: The IMailingList object.
        :param msg: The message object.
        :param raw_message: Raw message bytes
        :param dry: Whether or not to actually run
        :param dump: Optional path for dump on fail

        :return (lid, mid)
        """

        lid = textlib.normalize_lid(mlist.list_id, strict=True)
        if lid is None:
            raise ValueError(f"Invalid list id {lid}")

        private = False
        if hasattr(mlist, "archive_public") and mlist.archive_public is True:
            private = False
        elif hasattr(mlist, "archive_public") and mlist.archive_public is False:
            private = True
        elif (
            hasattr(mlist, "archive_policy")
            and mlist.archive_policy is not ArchivePolicy.public # pylint: disable=possibly-used-before-assignment
        ):
            private = True

        if raw_message is None:
            raw_message = msg.as_bytes()
        ojson, contents, msg_metadata, irt, skipit = self.compute_updates(
            lid, private, msg, raw_message, defaultepoch
        )
        if not ojson:
            _id = msg.get("message-id") or msg.get("Subject") or msg.get("Date")
            raise ValueError("Could not parse message %s for %s" % (_id, lid))
        if skipit:
            print("Skipping archiving of email due to invalid date and default date set to skip")
            return lid, "(skipped)"
        if digest:
            return lid, ojson["mid"]
        if dry:
            print("**** Dry run, not saving message to database *****")
            return lid, ojson["mid"]

        if dump:
            try:
                elastic = Elastic()
            except elasticsearch.exceptions.ElasticsearchException as e:
                print(e)
                print(
                    "ES connection failed, but dumponfail specified, dumping to %s"
                    % dump
                )
        else:
            elastic = Elastic()

        if config.get("archiver", "threadinfo"):
            try:
                timeout = int(config.get("archiver", "threadtimeout") or 5)
                timeout = str(timeout) + "s"
                limit = int(config.get("archiver", "threadparents") or 10)
                ojson = add_thread_properties(elastic, ojson, timeout, limit)
            except Exception as err:
                print("Could not add thread info", err)
                if logger:
                    logger.info("Could not add thread info %s", err)
            else:
                print("Added thread info successfully", ojson["mid"])
                if logger:
                    logger.info("Added thread info successfully %s", ojson["mid"])

        try:
            if contents:
                for key in contents:
                    elastic.index(
                        index=elastic.db_attachment,
                        id=key,
                        body={"source": contents[key]},
                    )

            elastic.index(
                index=elastic.db_mbox, id=ojson["mid"], body=ojson,
            )

            elastic.index(
                index=elastic.db_source,
                id=ojson["dbid"],
                body={
                    "message-id": msg_metadata["message-id"],
                    "source": mbox_source(raw_message),
                },
            )
            # Write to audit log
            try:
                auditlog_exists = elastic.indices.exists(index=elastic.db_auditlog)
            except elasticsearch.exceptions.AuthorizationException:
                auditlog_exists = False
            if auditlog_exists:
                elastic.index(
                    index=elastic.db_auditlog,
                    body={
                        "date": time.strftime("%Y/%m/%d %H:%M:%S", time.gmtime(time.time())),
                        "action": "index",
                        "remote": "internal",
                        "author": "archiver.py",
                        "target": ojson["mid"],
                        "lid": lid,
                        "log": f"Indexed email {ojson['message-id']} for {lid} as {ojson['mid']}",
                    }
                )

        # If we have a dump dir and ES failed, push to dump dir instead as a JSON object
        # We'll leave it to another process to pick up the slack.
        except Exception as err:
            print('Error on line {}:'.format(sys.exc_info()[-1].tb_lineno), type(err).__name__, err)
            if dump:
                print(
                    "Pushing to ES failed, but dumponfail specified, dumping JSON docs"
                )
                uid = uuid.uuid4()
                mbox_path = os.path.join(dump, "%s.json" % uid)
                with open(mbox_path, "w") as f:
                    json.dump(
                        {
                            "id": ojson["mid"],
                            "mbox": ojson,
                            "mbox_source": {
                                "id": ojson["dbid"],
                                "permalink": ojson["mid"],
                                "message-id": msg_metadata["message-id"],
                                "source": mbox_source(raw_message),
                            },
                            "attachments": contents,
                        },
                        f,
                        indent=2,
                    )
                    f.close()
                sys.exit(0)  # We're exiting here, the rest can't be done without ES
            # otherwise fail as before
            raise err

        if logger:
            logger.info("Pony Mail archived message %s successfully", ojson["mid"])
        oldrefs = []

        # Is this a direct reply to a pony mail email?
        if irt != "":
            dm = re.search(r"pony-([a-f0-9]+)-([a-f0-9]+)@", irt)
            if dm:
                cid = dm.group(1)
                mid = dm.group(2)
                if elastic.exists(index=elastic.db_account, id=cid):
                    doc = elastic.get(index=elastic.db_account, id=cid)
                    if doc:
                        oldrefs.append(cid)
                        # N.B. no index is supplied, so ES will generate one
                        elastic.index(
                            index=elastic.db_notification,
                            body={
                                "type": "direct",
                                "recipient": cid,
                                "list": lid,
                                "private": private,
                                "date": ojson["date"],
                                "from": msg_metadata["from"],
                                "to": msg_metadata["to"],
                                "subject": msg_metadata["subject"],
                                "message-id": msg_metadata["message-id"],
                                "in-reply-to": irt,
                                "epoch": ojson["epoch"],
                                "mid": mid,
                                "seen": 0,
                            },
                        )
                        if logger:
                            logger.info("Notification sent to %s for %s", cid, mid)

        # Are there indirect replies to pony emails?
        if msg_metadata.get("references"):
            for im in re.finditer(
                r"pony-([a-f0-9]+)-([a-f0-9]+)@", msg_metadata.get("references")
            ):
                cid = im.group(1)
                mid = im.group(2)
                # TODO: Fix this to work with pibbles
                if elastic.exists(index=elastic.db_mbox, id=cid):
                    doc = elastic.get(index=elastic.db_mbox, id=cid)

                    # does the user want to be notified of indirect replies?
                    if (
                        doc
                        and "preferences" in doc["_source"]
                        and doc["_source"]["preferences"].get("notifications")
                        == "indirect"
                        and cid not in oldrefs
                    ):
                        oldrefs.append(cid)
                        # N.B. no index mapping is supplied, so ES will generate one
                        elastic.index(
                            index=elastic.db_notification,
                            body={
                                "type": "indirect",
                                "recipient": cid,
                                "list": lid,
                                "private": private,
                                "date": ojson["date"],
                                "from": msg_metadata["from"],
                                "to": msg_metadata["to"],
                                "subject": msg_metadata["subject"],
                                "message-id": msg_metadata["message-id"],
                                "in-reply-to": irt,
                                "epoch": ojson["epoch"],
                                "mid": mid,
                                "seen": 0,
                            },
                        )
                        if logger:
                            logger.info("Notification sent to %s for %s", cid, mid)
        return lid, ojson["mid"]