in mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java [97:183]
public void startRead(RecordSender recordSender) {
if(lowerBound== null || upperBound == null ||
mongoClient == null || database == null ||
collection == null || mongodbColumnMeta == null) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
}
MongoDatabase db = mongoClient.getDatabase(database);
MongoCollection col = db.getCollection(this.collection);
MongoCursor<Document> dbCursor = null;
Document filter = new Document();
if (lowerBound.equals("min")) {
if (!upperBound.equals("max")) {
filter.append(KeyConstant.MONGO_PRIMARY_ID, new Document("$lt", isObjectId ? new ObjectId(upperBound.toString()) : upperBound));
}
} else if (upperBound.equals("max")) {
filter.append(KeyConstant.MONGO_PRIMARY_ID, new Document("$gte", isObjectId ? new ObjectId(lowerBound.toString()) : lowerBound));
} else {
filter.append(KeyConstant.MONGO_PRIMARY_ID, new Document("$gte", isObjectId ? new ObjectId(lowerBound.toString()) : lowerBound).append("$lt", isObjectId ? new ObjectId(upperBound.toString()) : upperBound));
}
if(!Strings.isNullOrEmpty(query)) {
Document queryFilter = Document.parse(query);
filter = new Document("$and", Arrays.asList(filter, queryFilter));
}
dbCursor = col.find(filter).iterator();
while (dbCursor.hasNext()) {
Document item = dbCursor.next();
Record record = recordSender.createRecord();
Iterator columnItera = mongodbColumnMeta.iterator();
while (columnItera.hasNext()) {
JSONObject column = (JSONObject)columnItera.next();
Object tempCol = item.get(column.getString(KeyConstant.COLUMN_NAME));
if (tempCol == null) {
if (KeyConstant.isDocumentType(column.getString(KeyConstant.COLUMN_TYPE))) {
String[] name = column.getString(KeyConstant.COLUMN_NAME).split("\\.");
if (name.length > 1) {
Object obj;
Document nestedDocument = item;
for (String str : name) {
obj = nestedDocument.get(str);
if (obj instanceof Document) {
nestedDocument = (Document) obj;
}
}
if (null != nestedDocument) {
Document doc = nestedDocument;
tempCol = doc.get(name[name.length - 1]);
}
}
}
}
if (tempCol == null) {
//continue; 这个不能直接continue会导致record到目的端错位
record.addColumn(new StringColumn(null));
}else if (tempCol instanceof Double) {
//TODO deal with Double.isNaN()
record.addColumn(new DoubleColumn((Double) tempCol));
} else if (tempCol instanceof Boolean) {
record.addColumn(new BoolColumn((Boolean) tempCol));
} else if (tempCol instanceof Date) {
record.addColumn(new DateColumn((Date) tempCol));
} else if (tempCol instanceof Integer) {
record.addColumn(new LongColumn((Integer) tempCol));
}else if (tempCol instanceof Long) {
record.addColumn(new LongColumn((Long) tempCol));
} else {
if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) {
String splitter = column.getString(KeyConstant.COLUMN_SPLITTER);
if(Strings.isNullOrEmpty(splitter)) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
} else {
ArrayList array = (ArrayList)tempCol;
String tempArrayStr = Joiner.on(splitter).join(array);
record.addColumn(new StringColumn(tempArrayStr));
}
} else {
record.addColumn(new StringColumn(tempCol.toString()));
}
}
}
recordSender.sendToWriter(record);
}
}