in tools/import-mbox.py [0:0]
def run(self):
global goodies, baddies, dedupped
self.name = Thread.getName(self)
ja = []
jas = []
self.printid("Thread started")
mla = None
ml = ""
mboxfile = ""
filename = ""
archie = archiver.Archiver(generator=args.generator, parse_html=args.html2text,
ignore_body=args.ibody[0] if args.ibody else None,
verbose=args.verbose,
skipff=args.skipff)
while len(lists) > 0:
self.printid("%u elements left to slurp" % len(lists))
block.acquire()
try:
mla = lists.pop(0)
if not mla:
self.printid("Nothing more to do here")
return
except Exception as err:
self.printid("Could not pop list: %s" % err)
return
finally:
block.release()
stime = time.time()
dFile = False
if imap:
imap4 = mla[2]
def mailgen(_list):
for uid in _list:
msgbytes = imap4.uid('fetch', uid, '(RFC822)')[1][0][1]
yield email.message_from_bytes(msgbytes)
messages = mailgen(mla[0])
elif filebased:
tmpname = mla[0]
filename = mla[0]
if filename.find(".gz") != -1:
self.printid("Decompressing %s..." % filename)
try:
with open(filename, "rb") as bf:
bmd = bf.read()
bf.close() # explicit early close
bmd = gzip.decompress(bmd)
tmpfile = tempfile.NamedTemporaryFile(mode='w+b', delete=False)
tmpfile.write(bmd)
tmpfile.flush()
tmpfile.close()
tmpname = tmpfile.name
dFile = True # Slated for deletion upon having been read
self.printid("%s -> %u bytes" % (tmpname, len(bmd)))
except Exception as err:
self.printid("This wasn't a gzip file: %s" % err)
self.printid("Slurping %s" % filename)
if maildir:
messages = mailbox.Maildir(tmpname, create=False)
else:
messages = mailbox.mbox(tmpname, None if noMboxo else MboxoFactory, create=False)
else:
ml = mla[0]
mboxfile = mla[1]
self.printid("Slurping %s/%s" % (ml, mboxfile))
ctx = urlopen("%s%s/%s" % (source, ml, mboxfile ))
inp = ctx.read().decode(ctx.headers.get_content_charset() or 'utf-8', errors='ignore')
tmpname = hashlib.sha224(("%f-%f-%s-%s.mbox" % (random.random(), time.time(), ml, mboxfile)).encode('utf-8') ).hexdigest()
with open(tmpname, "w") as f:
f.write(inp)
if maildir:
messages = mailbox.Maildir(tmpname, create=False)
else:
messages = mailbox.mbox(tmpname, None if noMboxo else MboxoFactory, create=False)
count = 0
bad = 0
for key in messages.iterkeys():
message=messages.get(key)
# If --filter is set, discard any messages not matching by continuing to next email
if fromFilter and 'from' in message and message['from'].find(fromFilter) == -1:
continue
if resendTo:
self.printid("Delivering message %s via MTA" % message['message-id'] if 'message-id' in message else '??')
s = SMTP('localhost')
try:
if list_override:
message.replace_header('List-ID', list_override)
message.replace_header('To', resendTo)
except:
if list_override:
message['List-ID'] = list_override
message['cc'] = None
s.send_message(message, from_addr=None, to_addrs=(resendTo))
continue
if (time.time() - stime > timeout): # break out after N seconds, it shouldn't take this long..!
self.printid("Whoa, this is taking way too long, ignoring %s for now" % tmpname)
break
# Don't pass message to archiver unless we have a list id
if not (list_override or message['list-id']):
self.printid("No list id found for %s " % message['message-id'])
bad += 1
continue
json, contents, _msgdata, _irt = archie.compute_updates(list_override, private, message)
# Not sure this can ever happen
if json and not (json['list'] and json['list_raw']):
self.printid("No list id found for %s " % json['message-id'])
bad += 1
continue
# If --dedup is active, try to filter out any messages that already exist on the list
if json and dedup and message.get('message-id', None):
res = es.search(
doc_type="mbox",
size = 1,
_source = ['mid'], # so can report the match source
body = {
'query': {
'bool': {
'must': [
{
'term': {
'message-id': message.get('message-id', None)
}
},
{
'term': {
'list_raw': json['list']
}
}
]
}
}
}
)
if res and res['hits']['total'] > 0:
self.printid("Dedupping %s - matched in %s" % (json['message-id'], res['hits']['hits'][0]['_source']['mid']))
dedupped += 1
continue
if json:
file=messages.get_file(key, True)
# If the parsed data is filtered, also need to filter the raw input
# so the source agrees with the summary info
if message.__class__.__name__ == 'MboxoFactory':
file=MboxoReader(file)
raw_msg=file.read()
file.close()
if args.dups:
try:
duplicates[json['mid']].append(json['message-id'] + " in " + filename)
except:
duplicates[json['mid']]=[json['message-id'] + " in " + filename]
try: # temporary hack to try and find an encoding issue
# needs to be replaced by proper exception handling
json_source = {
'mid': json['mid'], # needed for bulk-insert only, not needed in database
'message-id': json['message-id'],
'source': archiver.mbox_source(raw_msg)
}
except Exception as e:
self.printid("Error '%s' processing id %s msg %s " % (e, json['mid'], json['message-id']))
bad += 1
continue
count += 1
ja.append(json)
jas.append(json_source)
if args.verbose and verbose_logger:
# TODO optionally show other fields (e.g. From_ line)
verbose_logger.info("MID:%s MSGID:%s" % (json['mid'], json['message-id'].strip()))
if contents:
if not args.dry:
for key in contents:
es.index(
doc_type="attachment",
id=key,
body = {
'source': contents[key]
}
)
if len(ja) >= 40:
bulk_insert(self.name, ja, es, 'mbox')
ja = []
bulk_insert(self.name, jas, es, 'mbox_source')
jas = []
else:
self.printid("Failed to parse: Return=%s Message-Id=%s" % (message.get('Return-Path'), message.get('Message-Id')))
bad += 1
if filebased:
self.printid("Parsed %u records (failed: %u) from %s" % (count, bad, filename))
if dFile:
os.unlink(tmpname)
elif imap:
self.printid("Parsed %u records (failed: %u) from imap" % (count, bad))
else:
self.printid("Parsed %s/%s: %u records (failed: %u) from %s" % (ml, mboxfile, count, bad, tmpname))
os.unlink(tmpname)
goodies += count
baddies += bad
if len(ja) > 0:
bulk_insert(self.name, ja, es, 'mbox')
ja = []
if len(jas) > 0:
bulk_insert(self.name, jas, es, 'mbox_source')
jas = []
self.printid("Done, %u elements left to slurp" % len(lists))