def run()

in tools/import-mbox.py [0:0]


    def run(self):
        global goodies, baddies, dedupped
        self.name = Thread.getName(self)
        ja = []
        jas = []
        self.printid("Thread started")
        mla = None
        ml = ""
        mboxfile = ""
        filename = ""

        archie = archiver.Archiver(generator=args.generator, parse_html=args.html2text,
                                   ignore_body=args.ibody[0] if args.ibody else None,
                                   verbose=args.verbose,
                                   skipff=args.skipff)            
 
        while len(lists) > 0:
            self.printid("%u elements left to slurp" % len(lists))

            block.acquire()
            try:
                mla = lists.pop(0)
                if not mla:
                    self.printid("Nothing more to do here")
                    return
            except Exception as err:
                self.printid("Could not pop list: %s" % err)
                return
            finally:
                block.release()

            stime = time.time()
            dFile = False
            if imap:
                imap4 = mla[2]
                def mailgen(_list):
                    for uid in _list:
                        msgbytes = imap4.uid('fetch', uid, '(RFC822)')[1][0][1]
                        yield email.message_from_bytes(msgbytes)
                messages = mailgen(mla[0])
            elif filebased:

                tmpname = mla[0]
                filename = mla[0]
                if filename.find(".gz") != -1:
                    self.printid("Decompressing %s..." % filename)
                    try:
                        with open(filename, "rb") as bf:
                            bmd = bf.read()
                            bf.close() # explicit early close
                            bmd = gzip.decompress(bmd)
                            tmpfile = tempfile.NamedTemporaryFile(mode='w+b', delete=False)
                            tmpfile.write(bmd)
                            tmpfile.flush()
                            tmpfile.close()
                            tmpname = tmpfile.name
                            dFile = True # Slated for deletion upon having been read
                            self.printid("%s -> %u bytes" % (tmpname, len(bmd)))
                    except Exception as err:
                        self.printid("This wasn't a gzip file: %s" % err)
                self.printid("Slurping %s" % filename)
                if maildir:
                    messages = mailbox.Maildir(tmpname, create=False)
                else:
                    messages = mailbox.mbox(tmpname, None if noMboxo else MboxoFactory, create=False)

            else:
                ml = mla[0]
                mboxfile = mla[1]
                self.printid("Slurping %s/%s" % (ml, mboxfile))
                ctx = urlopen("%s%s/%s" % (source, ml, mboxfile ))
                inp = ctx.read().decode(ctx.headers.get_content_charset() or 'utf-8', errors='ignore')

                tmpname = hashlib.sha224(("%f-%f-%s-%s.mbox" % (random.random(), time.time(), ml, mboxfile)).encode('utf-8') ).hexdigest()
                with open(tmpname, "w") as f:
                    f.write(inp)
                if maildir:
                    messages = mailbox.Maildir(tmpname, create=False)
                else:
                    messages = mailbox.mbox(tmpname, None if noMboxo else MboxoFactory, create=False)

            count = 0
            bad = 0


            for key in messages.iterkeys():
                message=messages.get(key)
                # If --filter is set, discard any messages not matching by continuing to next email
                if fromFilter and 'from' in message and message['from'].find(fromFilter) == -1:
                    continue
                if resendTo:
                    self.printid("Delivering message %s via MTA" % message['message-id'] if 'message-id' in message else '??')
                    s = SMTP('localhost')
                    try:
                        if list_override:
                            message.replace_header('List-ID', list_override)
                        message.replace_header('To', resendTo)
                    except:
                        if list_override:
                            message['List-ID'] = list_override
                    message['cc'] = None
                    s.send_message(message, from_addr=None, to_addrs=(resendTo))
                    continue
                if (time.time() - stime > timeout): # break out after N seconds, it shouldn't take this long..!
                    self.printid("Whoa, this is taking way too long, ignoring %s for now" % tmpname)
                    break

                # Don't pass message to archiver unless we have a list id
                if not (list_override or message['list-id']):
                    self.printid("No list id found for %s " % message['message-id'])
                    bad += 1
                    continue

                json, contents, _msgdata, _irt = archie.compute_updates(list_override, private, message)

                # Not sure this can ever happen
                if json and not (json['list'] and json['list_raw']):
                    self.printid("No list id found for %s " % json['message-id'])
                    bad += 1
                    continue

                # If --dedup is active, try to filter out any messages that already exist on the list
                if json and dedup and message.get('message-id', None):
                    res = es.search(
                        doc_type="mbox",
                        size = 1,
                        _source = ['mid'], # so can report the match source
                        body = {
                            'query': {
                                'bool': {
                                    'must': [
                                        {
                                            'term': {
                                                'message-id': message.get('message-id', None)
                                            }
                                        },
                                        {
                                            'term': {
                                                'list_raw': json['list']
                                            }
                                        }
                                    ]
                                }
                            }
                        }
                    )
                    if res and res['hits']['total'] > 0:
                        self.printid("Dedupping %s - matched in %s" % (json['message-id'], res['hits']['hits'][0]['_source']['mid']))
                        dedupped += 1
                        continue

                if json:
                    file=messages.get_file(key, True)
                    # If the parsed data is filtered, also need to filter the raw input
                    # so the source agrees with the summary info
                    if message.__class__.__name__ == 'MboxoFactory':
                        file=MboxoReader(file)
                    raw_msg=file.read()
                    file.close()
                    if args.dups:
                        try:
                            duplicates[json['mid']].append(json['message-id'] + " in " + filename)
                        except:
                            duplicates[json['mid']]=[json['message-id'] + " in " + filename]

                    try: # temporary hack to try and find an encoding issue
                        # needs to be replaced by proper exception handling
                        json_source = {
                            'mid': json['mid'], # needed for bulk-insert only, not needed in database
                            'message-id': json['message-id'],
                            'source': archiver.mbox_source(raw_msg)
                        }
                    except Exception as e:
                        self.printid("Error '%s' processing id %s msg %s " % (e, json['mid'], json['message-id']))
                        bad += 1
                        continue

                    count += 1
                    ja.append(json)
                    jas.append(json_source)
                    if args.verbose and verbose_logger:
                        # TODO optionally show other fields (e.g. From_ line)
                        verbose_logger.info("MID:%s MSGID:%s" % (json['mid'], json['message-id'].strip()))
                    if contents:
                        if not args.dry:
                            for key in contents:
                                es.index(
                                    doc_type="attachment",
                                    id=key,
                                    body = {
                                        'source': contents[key]
                                    }
                                )
                    if len(ja) >= 40:
                        bulk_insert(self.name, ja, es, 'mbox')
                        ja = []

                        bulk_insert(self.name, jas, es, 'mbox_source')
                        jas = []
                else:
                    self.printid("Failed to parse: Return=%s Message-Id=%s" % (message.get('Return-Path'), message.get('Message-Id')))
                    bad += 1

            if filebased:
                self.printid("Parsed %u records (failed: %u) from %s" % (count, bad, filename))
                if dFile:
                    os.unlink(tmpname)
            elif imap:
                self.printid("Parsed %u records (failed: %u) from imap" % (count, bad))
            else:
                self.printid("Parsed %s/%s: %u records (failed: %u) from %s" % (ml, mboxfile, count, bad, tmpname))
                os.unlink(tmpname)

            goodies += count
            baddies += bad
            if len(ja) > 0:
                bulk_insert(self.name, ja, es, 'mbox')
            ja = []

            if len(jas) > 0:
                bulk_insert(self.name, jas, es, 'mbox_source')
            jas = []
        self.printid("Done, %u elements left to slurp" % len(lists))