scripts/comparison_3x.py (176 lines of code) (raw):
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymongo
import time
import random
import sys
import getopt
# constant
COMPARISION_COUNT = "comparison_count"
COMPARISION_MODE = "comparisonMode"
EXCLUDE_DBS = "excludeDbs"
EXCLUDE_COLLS = "excludeColls"
SAMPLE = "sample"
# we don't check collections and index here because sharding's collection(`db.stats`) is splitted.
CheckList = {"objects": 1, "numExtents": 1, "ok": 1}
configure = {}
def log_info(message):
print("INFO [%s] %s " % (time.strftime('%Y-%m-%d %H:%M:%S'), message))
def log_error(message):
print("ERROR [%s] %s " % (time.strftime('%Y-%m-%d %H:%M:%S'), message))
class MongoCluster:
# pymongo connection
conn = None
# connection string
url = ""
def __init__(self, url):
self.url = url
def connect(self):
self.conn = pymongo.MongoClient(self.url)
def close(self):
self.conn.close()
def filter_check(m):
new_m = {}
for k in CheckList:
new_m[k] = m[k]
return new_m
"""
check meta data. include db.collection names and stats()
"""
def check(src, dst):
#
# check metadata
#
srcDbNames = src.conn.list_database_names()
dstDbNames = dst.conn.list_database_names()
srcDbNames = [db for db in srcDbNames if db not in configure[EXCLUDE_DBS]]
dstDbNames = [db for db in dstDbNames if db not in configure[EXCLUDE_DBS]]
if len(srcDbNames) != len(dstDbNames):
log_error("DIFF => database count not equals src[%s] != dst[%s].\nsrc: %s\ndst: %s" % (len(srcDbNames),
len(dstDbNames),
srcDbNames,
dstDbNames))
return False
else:
log_info("EQUL => database count equals")
# check database names and collections
for db in srcDbNames:
if db in configure[EXCLUDE_DBS]:
log_info("IGNR => ignore database [%s]" % db)
continue
if dstDbNames.count(db) == 0:
log_error("DIFF => database [%s] only in srcDb" % (db))
return False
# db.stats() comparison
srcDb = src.conn[db]
dstDb = dst.conn[db]
# srcStats = srcDb.command("dbstats")
# dstStats = dstDb.command("dbstats")
#
# srcStats = filter_check(srcStats)
# dstStats = filter_check(dstStats)
#
# if srcStats != dstStats:
# log_error("DIFF => database [%s] stats not equals src[%s], dst[%s]" % (db, srcStats, dstStats))
# return False
# else:
# log_info("EQUL => database [%s] stats equals" % db)
# for collections in db
srcColls = srcDb.list_collection_names()
dstColls = dstDb.list_collection_names()
srcColls = [coll for coll in srcColls if coll not in configure[EXCLUDE_COLLS] and srcColls.count(coll) > 0]
dstColls = [coll for coll in dstColls if coll not in configure[EXCLUDE_COLLS] and dstColls.count(coll) > 0]
if len(srcColls) != len(dstColls):
log_error("DIFF => database [%s] collections count not equals, src[%s], dst[%s]" % (db, srcColls, dstColls))
return False
else:
log_info("EQUL => database [%s] collections count equals" % (db))
for coll in srcColls:
if coll in configure[EXCLUDE_COLLS]:
log_info("IGNR => ignore collection [%s]" % coll)
continue
if dstColls.count(coll) == 0:
log_error("DIFF => collection only in source [%s]" % (coll))
return False
srcColl = srcDb[coll]
dstColl = dstDb[coll]
log_info("compare count for collection [%s]" % coll)
# comparison collection records number
if srcColl.estimated_document_count() != dstColl.estimated_document_count():
log_error("DIFF => collection [%s] record count not equals" % (coll))
return False
else:
log_info("EQUL => collection [%s] record count equals" % (coll))
log_info("compare index for collection [%s]" % coll)
# comparison collection index number
src_index_length = len(srcColl.index_information())
dst_index_length = len(dstColl.index_information())
if src_index_length != dst_index_length:
log_error("DIFF => collection [%s] index number not equals: src[%r], dst[%r]" % (coll, src_index_length, dst_index_length))
return False
else:
log_info("EQUL => collection [%s] index number equals" % (coll))
log_info("compare data sample for collection [%s]" % coll)
# check sample data
if not data_comparison(srcColl, dstColl, configure[COMPARISION_MODE]):
log_error("DIFF => collection [%s] data comparison not equals" % (coll))
return False
else:
log_info("EQUL => collection [%s] data data comparison exactly eauals" % (coll))
return True
"""
check sample data. comparison every entry
"""
def data_comparison(srcColl, dstColl, mode):
if mode == "no":
return True
elif mode == "sample":
# srcColl.count() mus::t equals to dstColl.count()
count = configure[COMPARISION_COUNT] if configure[COMPARISION_COUNT] <= srcColl.estimated_document_count() else srcColl.estimated_document_count()
else: # all
count = srcColl.count_documents({})
if count == 0:
return True
rec_count = count
batch = 16
show_progress = (batch * 64)
total = 0
while count > 0:
# sample a bounch of docs
docs = srcColl.aggregate([{"$sample": {"size":batch}}])
while docs.alive:
doc = docs.next()
migrated = dstColl.find_one(doc["_id"])
# both origin and migrated bson is Map . so use ==
if doc != migrated:
log_error("DIFF => src_record[%s], dst_record[%s]" % (doc, migrated))
return False
total += batch
count -= batch
if total % show_progress == 0:
log_info(" ... process %d docs, %.2f %% !" % (total, total * 100.0 / rec_count))
return True
def usage():
print('|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|')
print("| Usage: ./comparison.py --src=localhost:27017/db? --dest=localhost:27018/db? --count=10000 (the sample number) --excludeDbs=admin,local --excludeCollections=system.profile --comparisonMode=sample/all/no (sample: comparison sample number, default; all: comparison all data; no: only comparison outline without data) |")
print('|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|')
print('| Like : ./comparison.py --src="localhost:3001" --dest=localhost:3100 --count=1000 --excludeDbs=admin,local,mongoshake --excludeCollections=system.profile --comparisonMode=sample |')
print('|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|')
exit(0)
if __name__ == "__main__":
opts, args = getopt.getopt(sys.argv[1:], "hs:d:n:e:x:", ["help", "src=", "dest=", "count=", "excludeDbs=", "excludeCollections=", "comparisonMode="])
configure[SAMPLE] = True
configure[EXCLUDE_DBS] = []
configure[EXCLUDE_COLLS] = []
srcUrl, dstUrl = "", ""
for key, value in opts:
if key in ("-h", "--help"):
usage()
if key in ("-s", "--src"):
srcUrl = value
if key in ("-d", "--dest"):
dstUrl = value
if key in ("-n", "--count"):
configure[COMPARISION_COUNT] = int(value)
if key in ("-e", "--excludeDbs"):
configure[EXCLUDE_DBS] = value.split(",")
if key in ("-x", "--excludeCollections"):
configure[EXCLUDE_COLLS] = value.split(",")
if key in ("--comparisonMode"):
print(value)
if value != "all" and value != "no" and value != "sample":
log_info("comparisonMode[%r] illegal" % (value))
exit(1)
configure[COMPARISION_MODE] = value
if COMPARISION_MODE not in configure:
configure[COMPARISION_MODE] = "sample"
# params verify
if len(srcUrl) == 0 or len(dstUrl) == 0:
usage()
# default count is 10000
if configure.get(COMPARISION_COUNT) is None or configure.get(COMPARISION_COUNT) <= 0:
configure[COMPARISION_COUNT] = 10000
# ignore databases
configure[EXCLUDE_DBS] += ["admin", "local"]
configure[EXCLUDE_COLLS] += ["system.profile"]
# dump configuration
log_info("Configuration [sample=%s, count=%d, excludeDbs=%s, excludeColls=%s]" % (configure[SAMPLE], configure[COMPARISION_COUNT], configure[EXCLUDE_DBS], configure[EXCLUDE_COLLS]))
try :
src, dst = MongoCluster(srcUrl), MongoCluster(dstUrl)
print("[src = %s]" % srcUrl)
print("[dst = %s]" % dstUrl)
src.connect()
dst.connect()
except (Exception, e):
print(e)
log_error("create mongo connection failed %s|%s" % (srcUrl, dstUrl))
exit()
if check(src, dst):
print("SUCCESS")
exit(0)
else:
print("FAIL")
exit(-1)
src.close()
dst.close()