in mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/util/CollectionSplitUtil.java [66:167]
private static List<Range> doSplitCollection(int adviceNumber, MongoClient mongoClient,
String dbName, String collName, boolean isObjectId) {
MongoDatabase database = mongoClient.getDatabase(dbName);
List<Range> rangeList = new ArrayList<Range>();
if (adviceNumber == 1) {
Range range = new Range();
range.lowerBound = "min";
range.upperBound = "max";
return Arrays.asList(range);
}
Document result = database.runCommand(new Document("collStats", collName));
int docCount = result.getInteger("count");
if (docCount == 0) {
return rangeList;
}
int avgObjSize = 1;
Object avgObjSizeObj = result.get("avgObjSize");
if (avgObjSizeObj instanceof Integer) {
avgObjSize = ((Integer) avgObjSizeObj).intValue();
} else if (avgObjSizeObj instanceof Double) {
avgObjSize = ((Double) avgObjSizeObj).intValue();
}
int splitPointCount = adviceNumber - 1;
int chunkDocCount = docCount / adviceNumber;
ArrayList<Object> splitPoints = new ArrayList<Object>();
// test if user has splitVector role(clusterManager)
boolean supportSplitVector = true;
try {
database.runCommand(new Document("splitVector", dbName + "." + collName)
.append("keyPattern", new Document(KeyConstant.MONGO_PRIMARY_ID, 1))
.append("force", true));
} catch (MongoCommandException e) {
if (e.getErrorCode() == KeyConstant.MONGO_UNAUTHORIZED_ERR_CODE ||
e.getErrorCode() == KeyConstant.MONGO_ILLEGALOP_ERR_CODE) {
supportSplitVector = false;
}
}
if (supportSplitVector) {
boolean forceMedianSplit = false;
int maxChunkSize = (docCount / splitPointCount - 1) * 2 * avgObjSize / (1024 * 1024);
//int maxChunkSize = (chunkDocCount - 1) * 2 * avgObjSize / (1024 * 1024);
if (maxChunkSize < 1) {
forceMedianSplit = true;
}
if (!forceMedianSplit) {
result = database.runCommand(new Document("splitVector", dbName + "." + collName)
.append("keyPattern", new Document(KeyConstant.MONGO_PRIMARY_ID, 1))
.append("maxChunkSize", maxChunkSize)
.append("maxSplitPoints", adviceNumber - 1));
} else {
result = database.runCommand(new Document("splitVector", dbName + "." + collName)
.append("keyPattern", new Document(KeyConstant.MONGO_PRIMARY_ID, 1))
.append("force", true));
}
ArrayList<Document> splitKeys = result.get("splitKeys", ArrayList.class);
for (int i = 0; i < splitKeys.size(); i++) {
Document splitKey = splitKeys.get(i);
Object id = splitKey.get(KeyConstant.MONGO_PRIMARY_ID);
if (isObjectId) {
ObjectId oid = (ObjectId)id;
splitPoints.add(oid.toHexString());
} else {
splitPoints.add(id);
}
}
} else {
int skipCount = chunkDocCount;
MongoCollection<Document> col = database.getCollection(collName);
for (int i = 0; i < splitPointCount; i++) {
Document doc = col.find().skip(skipCount).limit(chunkDocCount).first();
Object id = doc.get(KeyConstant.MONGO_PRIMARY_ID);
if (isObjectId) {
ObjectId oid = (ObjectId)id;
splitPoints.add(oid.toHexString());
} else {
splitPoints.add(id);
}
skipCount += chunkDocCount;
}
}
Object lastObjectId = "min";
for (Object splitPoint : splitPoints) {
Range range = new Range();
range.lowerBound = lastObjectId;
lastObjectId = splitPoint;
range.upperBound = lastObjectId;
rangeList.add(range);
}
Range range = new Range();
range.lowerBound = lastObjectId;
range.upperBound = "max";
rangeList.add(range);
return rangeList;
}