tools/data_generator.py (90 lines of code) (raw):
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import traceback
import threading
import random
import pymongo
import time
import os
from random import Random
mongo_url = "mongodb://127.0.0.1:8001"
mongo_database = "generator"
globalLock = threading.Lock()
mark = 0
init_schema_only = False
def log_info(message):
print "[%s] %s " % (time.strftime('%Y-%m-%d %H:%M:%S'), message)
def random_string(randomlength=8):
str = ''
chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'
length = len(chars) - 1
random = Random()
for i in range(randomlength):
str += chars[random.randint(0, length)]
return str
def worker(collection, index):
global mark
global init_schema_only
# schema :
# userId, phone, firstName, lastName, info{col1,col2}
#
# create a number of unique index
collection.create_index([('userId', pymongo.ASCENDING)], unique=True)
collection.create_index([('firstName', pymongo.ASCENDING), ('lastName', pymongo.DESCENDING)], unique=True)
collection.create_index([('phone', pymongo.ASCENDING)])
collection.create_index([('info.col2', pymongo.ASCENDING)], unique=True)
if init_schema_only:
log_info("worker-%d only create collection's schema. no datas populate")
else:
log_info("worker-%d start to fill up data ")
# insert lots of records
success = []
batch = []
times = 5000
for i in range(0, times):
try:
userId = random_string()
batch.append({'userId': userId, 'firstName': "Kobe_%i" % i, 'lastName': "Brant_%d" % (i * 17),
'phone': 18911100000 + i, "info": {"col1": i, "col2": i}})
if len(batch) >= 100:
collection.insert_many(batch)
batch = []
success.append(userId)
except Exception, e:
log_info("duplicated records insert %d.... just skip" % i)
print e
log_info("worker-%d finish filling phase")
# random update 5k times
for i in range(0, times * 10):
try:
n = random.randint(0, len(success))
collection.update({'userId': success[n]}, {'userId': success[n], 'firstName': "Michael_%i" % i, 'lastName': "Jorndan_%d" % (i * 13),
'phone': 17711100000 + i, "info": {"col1": times + n, "col2": times + i * 3}})
except Exception, e:
log_info("duplicated records update %d.... just skip" % i)
print e
log_info("worker-%d finish random updates")
log_info("worker-%d complete job")
globalLock.acquire(1)
mark = mark - index
globalLock.release()
if __name__ == "__main__":
conn = pymongo.MongoClient(mongo_url)
if len(conn.database_names()) == 0:
log_info("connect to MongoDB %s failed" % (mongo_url))
os._exit(0)
try:
# purge all data at first !
log_info("1). drop database")
conn.drop_database(mongo_database)
database = conn[mongo_database]
colls = []
log_info("2). start worker")
nWorker = 16
for i in range(0, nWorker):
mark = mark + i
for i in range(0, nWorker):
# new real worker
collection = database["test%d" % i]
threading._start_new_thread(worker, (collection, i))
colls.append(collection)
log_info("3). wait complete")
# wait workers
time.sleep(5)
while True:
globalLock.acquire(1)
quit = (mark == 0)
globalLock.release()
if quit:
break
except Exception, e:
traceback.print_exc()