public static ChangeStreamIterable getChangeStreamIterable()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java [184:271]


    public static ChangeStreamIterable<Document> getChangeStreamIterable(
            MongoClient mongoClient,
            @Nullable String database,
            @Nullable String collection,
            @Nullable Pattern databaseRegex,
            @Nullable Pattern namespaceRegex,
            int batchSize,
            boolean updateLookup,
            boolean fullDocPrePostImage) {
        ChangeStreamIterable<Document> changeStream;
        if (StringUtils.isNotEmpty(database) && StringUtils.isNotEmpty(collection)) {
            MongoCollection<Document> coll =
                    mongoClient.getDatabase(database).getCollection(collection);
            LOG.info("Preparing change stream for collection {}.{}", database, collection);
            changeStream = coll.watch();
        } else if (StringUtils.isNotEmpty(database) && namespaceRegex != null) {
            MongoDatabase db = mongoClient.getDatabase(database);
            List<Bson> pipeline = new ArrayList<>();
            pipeline.add(CollectionDiscoveryUtils.ADD_NS_FIELD);
            Bson nsFilter =
                    Filters.regex(CollectionDiscoveryUtils.ADD_NS_FIELD_NAME, namespaceRegex);
            pipeline.add(match(nsFilter));
            LOG.info(
                    "Preparing change stream for database {} with namespace regex filter {}",
                    database,
                    namespaceRegex);
            changeStream = db.watch(pipeline);
        } else if (StringUtils.isNotEmpty(database)) {
            MongoDatabase db = mongoClient.getDatabase(database);
            LOG.info("Preparing change stream for database {}", database);
            changeStream = db.watch();
        } else if (namespaceRegex != null) {
            List<Bson> pipeline = new ArrayList<>();
            pipeline.add(CollectionDiscoveryUtils.ADD_NS_FIELD);

            Bson nsFilter =
                    Filters.regex(CollectionDiscoveryUtils.ADD_NS_FIELD_NAME, namespaceRegex);
            if (databaseRegex != null) {
                Bson dbFilter = regex("ns.db", databaseRegex);
                nsFilter = and(dbFilter, nsFilter);
                LOG.info(
                        "Preparing change stream for deployment with"
                                + " database regex filter {} and namespace regex filter {}",
                        databaseRegex,
                        namespaceRegex);
            } else {
                LOG.info(
                        "Preparing change stream for deployment with namespace regex filter {}",
                        namespaceRegex);
            }

            pipeline.add(match(nsFilter));
            changeStream = mongoClient.watch(pipeline);
        } else if (databaseRegex != null) {
            List<Bson> pipeline = new ArrayList<>();
            pipeline.add(match(regex("ns.db", databaseRegex)));

            LOG.info(
                    "Preparing change stream for deployment  with database regex filter {}",
                    databaseRegex);
            changeStream = mongoClient.watch(pipeline);
        } else {
            LOG.info("Preparing change stream for deployment");
            changeStream = mongoClient.watch();
        }

        if (batchSize > 0) {
            changeStream.batchSize(batchSize);
        }

        if (fullDocPrePostImage) {
            if (StringUtils.isNotEmpty(database) && StringUtils.isNotEmpty(collection)) {
                // require both pre-image and post-image records
                changeStream.fullDocument(FullDocument.REQUIRED);
                changeStream.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
            } else {
                // for RegEx limited namespaces, use WHEN_AVAILABLE option
                // to avoid MongoDB complaining about missing pre- and post-image
                // coming from irrelevant collections
                changeStream.fullDocument(FullDocument.WHEN_AVAILABLE);
                changeStream.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE);
            }
        } else if (updateLookup) {
            changeStream.fullDocument(FullDocument.UPDATE_LOOKUP);
        }

        return changeStream;
    }