in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java [100:145]
public List<SourceSchema> getSchemaList() throws Exception {
String databaseName = config.get(MongoDBSourceOptions.DATABASE);
List<SourceSchema> schemaList = new ArrayList<>();
MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder();
settingsBuilder.applyConnectionString(
new ConnectionString(
buildConnectionString(
config.get(MongoDBSourceOptions.USERNAME),
config.get(MongoDBSourceOptions.PASSWORD),
config.get(MongoDBSourceOptions.SCHEME),
config.get(MongoDBSourceOptions.HOSTS),
config.get(MongoDBSourceOptions.CONNECTION_OPTIONS))));
MongoClientSettings settings = settingsBuilder.build();
Double samplePercent = config.get(MONGO_CDC_CREATE_SAMPLE_PERCENT);
Long minSampleSize = config.get(MONGO_CDC_MIN_SAMPLE_SIZE);
Long maxSampleSize = config.get(MONGO_CDC_MAX_SAMPLE_SIZE);
try (MongoClient mongoClient = MongoClients.create(settings)) {
MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName);
MongoIterable<String> collectionNames = mongoDatabase.listCollectionNames();
for (String collectionName : collectionNames) {
if (!isSyncNeeded(collectionName)) {
continue;
}
MongoCollection<Document> collection = mongoDatabase.getCollection(collectionName);
Document firstDocument = collection.find().first();
if (firstDocument == null) {
throw new IllegalStateException("No documents in collection to infer schema");
}
long totalDocuments = collection.estimatedDocumentCount();
long sampleSize =
calculateSampleSize(
totalDocuments, samplePercent, minSampleSize, maxSampleSize);
ArrayList<Document> documents = sampleData(collection, sampleSize);
MongoDBSchema mongoDBSchema =
new MongoDBSchema(documents, databaseName, collectionName, null);
mongoDBSchema.setModel(DataModel.UNIQUE);
schemaList.add(mongoDBSchema);
}
}
return schemaList;
}