in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java [262:363]
private void initCapturedCollections(Map<String, String> props) {
ConnectionString connectionString =
new ConnectionString(props.get(MongoSourceConfig.CONNECTION_URI_CONFIG));
String databaseIncludeList = props.get(DATABASE_INCLUDE_LIST);
String collectionIncludeList = props.get(COLLECTION_INCLUDE_LIST);
List<String> databaseList =
Optional.ofNullable(databaseIncludeList)
.map(input -> Arrays.asList(input.split(",")))
.orElse(null);
List<String> collectionList =
Optional.ofNullable(collectionIncludeList)
.map(input -> Arrays.asList(input.split(",")))
.orElse(null);
if (collectionList != null) {
// Watching collections changes
List<String> discoveredDatabases;
List<String> discoveredCollections;
try (MongoClient mongoClient = MongoClients.create(connectionString)) {
discoveredDatabases = databaseNames(mongoClient, databaseFilter(databaseList));
discoveredCollections =
collectionNames(
mongoClient,
discoveredDatabases,
collectionsFilter(collectionList));
}
// case: database = db0, collection = coll1
if (isIncludeListExplicitlySpecified(collectionList, discoveredCollections)) {
MongoNamespace namespace = new MongoNamespace(discoveredCollections.get(0));
props.put(MongoSourceConfig.DATABASE_CONFIG, namespace.getDatabaseName());
props.put(MongoSourceConfig.COLLECTION_CONFIG, namespace.getCollectionName());
} else { // case: database = db0|db2, collection = (db0.coll[0-9])|(db1.coll[1-2])
String namespacesRegex =
includeListAsPatterns(collectionList).stream()
.map(Pattern::pattern)
.collect(Collectors.joining("|"));
List<Bson> pipeline = new ArrayList<>();
pipeline.add(ADD_NS_FIELD);
Bson nsFilter = regex(ADD_NS_FIELD_NAME, namespacesRegex);
if (databaseList != null) {
if (isIncludeListExplicitlySpecified(databaseList, discoveredDatabases)) {
props.put(MongoSourceConfig.DATABASE_CONFIG, discoveredDatabases.get(0));
} else {
String databaseRegex =
includeListAsPatterns(databaseList).stream()
.map(Pattern::pattern)
.collect(Collectors.joining("|"));
Bson dbFilter = regex("ns.db", databaseRegex);
nsFilter = and(dbFilter, nsFilter);
}
}
pipeline.add(match(nsFilter));
props.put(MongoSourceConfig.PIPELINE_CONFIG, bsonListToJson(pipeline));
String copyExistingNamespaceRegex =
discoveredCollections.stream()
.map(ns -> completionPattern(ns).pattern())
.collect(Collectors.joining("|"));
props.put(
MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG,
copyExistingNamespaceRegex);
}
} else if (databaseList != null) {
// Watching databases changes
List<String> discoveredDatabases;
try (MongoClient mongoClient = MongoClients.create(connectionString)) {
discoveredDatabases = databaseNames(mongoClient, databaseFilter(databaseList));
}
if (isIncludeListExplicitlySpecified(databaseList, discoveredDatabases)) {
props.put(MongoSourceConfig.DATABASE_CONFIG, discoveredDatabases.get(0));
} else {
String databaseRegex =
includeListAsPatterns(databaseList).stream()
.map(Pattern::pattern)
.collect(Collectors.joining("|"));
List<Bson> pipeline = new ArrayList<>();
pipeline.add(match(regex("ns.db", databaseRegex)));
props.put(MongoSourceConfig.PIPELINE_CONFIG, bsonListToJson(pipeline));
String copyExistingNamespaceRegex =
discoveredDatabases.stream()
.map(db -> completionPattern(db + "\\..*").pattern())
.collect(Collectors.joining("|"));
props.put(
MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG,
copyExistingNamespaceRegex);
}
} else {
// Watching all changes on the cluster by default, we do nothing here
}
}