in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java [184:271]
public static ChangeStreamIterable<Document> getChangeStreamIterable(
MongoClient mongoClient,
@Nullable String database,
@Nullable String collection,
@Nullable Pattern databaseRegex,
@Nullable Pattern namespaceRegex,
int batchSize,
boolean updateLookup,
boolean fullDocPrePostImage) {
ChangeStreamIterable<Document> changeStream;
if (StringUtils.isNotEmpty(database) && StringUtils.isNotEmpty(collection)) {
MongoCollection<Document> coll =
mongoClient.getDatabase(database).getCollection(collection);
LOG.info("Preparing change stream for collection {}.{}", database, collection);
changeStream = coll.watch();
} else if (StringUtils.isNotEmpty(database) && namespaceRegex != null) {
MongoDatabase db = mongoClient.getDatabase(database);
List<Bson> pipeline = new ArrayList<>();
pipeline.add(CollectionDiscoveryUtils.ADD_NS_FIELD);
Bson nsFilter =
Filters.regex(CollectionDiscoveryUtils.ADD_NS_FIELD_NAME, namespaceRegex);
pipeline.add(match(nsFilter));
LOG.info(
"Preparing change stream for database {} with namespace regex filter {}",
database,
namespaceRegex);
changeStream = db.watch(pipeline);
} else if (StringUtils.isNotEmpty(database)) {
MongoDatabase db = mongoClient.getDatabase(database);
LOG.info("Preparing change stream for database {}", database);
changeStream = db.watch();
} else if (namespaceRegex != null) {
List<Bson> pipeline = new ArrayList<>();
pipeline.add(CollectionDiscoveryUtils.ADD_NS_FIELD);
Bson nsFilter =
Filters.regex(CollectionDiscoveryUtils.ADD_NS_FIELD_NAME, namespaceRegex);
if (databaseRegex != null) {
Bson dbFilter = regex("ns.db", databaseRegex);
nsFilter = and(dbFilter, nsFilter);
LOG.info(
"Preparing change stream for deployment with"
+ " database regex filter {} and namespace regex filter {}",
databaseRegex,
namespaceRegex);
} else {
LOG.info(
"Preparing change stream for deployment with namespace regex filter {}",
namespaceRegex);
}
pipeline.add(match(nsFilter));
changeStream = mongoClient.watch(pipeline);
} else if (databaseRegex != null) {
List<Bson> pipeline = new ArrayList<>();
pipeline.add(match(regex("ns.db", databaseRegex)));
LOG.info(
"Preparing change stream for deployment with database regex filter {}",
databaseRegex);
changeStream = mongoClient.watch(pipeline);
} else {
LOG.info("Preparing change stream for deployment");
changeStream = mongoClient.watch();
}
if (batchSize > 0) {
changeStream.batchSize(batchSize);
}
if (fullDocPrePostImage) {
if (StringUtils.isNotEmpty(database) && StringUtils.isNotEmpty(collection)) {
// require both pre-image and post-image records
changeStream.fullDocument(FullDocument.REQUIRED);
changeStream.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
} else {
// for RegEx limited namespaces, use WHEN_AVAILABLE option
// to avoid MongoDB complaining about missing pre- and post-image
// coming from irrelevant collections
changeStream.fullDocument(FullDocument.WHEN_AVAILABLE);
changeStream.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE);
}
} else if (updateLookup) {
changeStream.fullDocument(FullDocument.UPDATE_LOOKUP);
}
return changeStream;
}