private void initCapturedCollections()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java [262:363]


    private void initCapturedCollections(Map<String, String> props) {
        ConnectionString connectionString =
                new ConnectionString(props.get(MongoSourceConfig.CONNECTION_URI_CONFIG));

        String databaseIncludeList = props.get(DATABASE_INCLUDE_LIST);
        String collectionIncludeList = props.get(COLLECTION_INCLUDE_LIST);

        List<String> databaseList =
                Optional.ofNullable(databaseIncludeList)
                        .map(input -> Arrays.asList(input.split(",")))
                        .orElse(null);

        List<String> collectionList =
                Optional.ofNullable(collectionIncludeList)
                        .map(input -> Arrays.asList(input.split(",")))
                        .orElse(null);

        if (collectionList != null) {
            // Watching collections changes
            List<String> discoveredDatabases;
            List<String> discoveredCollections;
            try (MongoClient mongoClient = MongoClients.create(connectionString)) {
                discoveredDatabases = databaseNames(mongoClient, databaseFilter(databaseList));
                discoveredCollections =
                        collectionNames(
                                mongoClient,
                                discoveredDatabases,
                                collectionsFilter(collectionList));
            }

            // case: database = db0, collection = coll1
            if (isIncludeListExplicitlySpecified(collectionList, discoveredCollections)) {
                MongoNamespace namespace = new MongoNamespace(discoveredCollections.get(0));
                props.put(MongoSourceConfig.DATABASE_CONFIG, namespace.getDatabaseName());
                props.put(MongoSourceConfig.COLLECTION_CONFIG, namespace.getCollectionName());
            } else { // case: database = db0|db2, collection = (db0.coll[0-9])|(db1.coll[1-2])
                String namespacesRegex =
                        includeListAsPatterns(collectionList).stream()
                                .map(Pattern::pattern)
                                .collect(Collectors.joining("|"));

                List<Bson> pipeline = new ArrayList<>();
                pipeline.add(ADD_NS_FIELD);

                Bson nsFilter = regex(ADD_NS_FIELD_NAME, namespacesRegex);
                if (databaseList != null) {
                    if (isIncludeListExplicitlySpecified(databaseList, discoveredDatabases)) {
                        props.put(MongoSourceConfig.DATABASE_CONFIG, discoveredDatabases.get(0));
                    } else {
                        String databaseRegex =
                                includeListAsPatterns(databaseList).stream()
                                        .map(Pattern::pattern)
                                        .collect(Collectors.joining("|"));
                        Bson dbFilter = regex("ns.db", databaseRegex);
                        nsFilter = and(dbFilter, nsFilter);
                    }
                }
                pipeline.add(match(nsFilter));

                props.put(MongoSourceConfig.PIPELINE_CONFIG, bsonListToJson(pipeline));

                String copyExistingNamespaceRegex =
                        discoveredCollections.stream()
                                .map(ns -> completionPattern(ns).pattern())
                                .collect(Collectors.joining("|"));

                props.put(
                        MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG,
                        copyExistingNamespaceRegex);
            }
        } else if (databaseList != null) {
            // Watching databases changes
            List<String> discoveredDatabases;
            try (MongoClient mongoClient = MongoClients.create(connectionString)) {
                discoveredDatabases = databaseNames(mongoClient, databaseFilter(databaseList));
            }

            if (isIncludeListExplicitlySpecified(databaseList, discoveredDatabases)) {
                props.put(MongoSourceConfig.DATABASE_CONFIG, discoveredDatabases.get(0));
            } else {
                String databaseRegex =
                        includeListAsPatterns(databaseList).stream()
                                .map(Pattern::pattern)
                                .collect(Collectors.joining("|"));

                List<Bson> pipeline = new ArrayList<>();
                pipeline.add(match(regex("ns.db", databaseRegex)));
                props.put(MongoSourceConfig.PIPELINE_CONFIG, bsonListToJson(pipeline));

                String copyExistingNamespaceRegex =
                        discoveredDatabases.stream()
                                .map(db -> completionPattern(db + "\\..*").pattern())
                                .collect(Collectors.joining("|"));

                props.put(
                        MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG,
                        copyExistingNamespaceRegex);
            }
        } else {
            // Watching all changes on the cluster by default, we do nothing here
        }
    }