public List getSchemaList()

in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java [100:145]


    public List<SourceSchema> getSchemaList() throws Exception {
        String databaseName = config.get(MongoDBSourceOptions.DATABASE);
        List<SourceSchema> schemaList = new ArrayList<>();
        MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder();

        settingsBuilder.applyConnectionString(
                new ConnectionString(
                        buildConnectionString(
                                config.get(MongoDBSourceOptions.USERNAME),
                                config.get(MongoDBSourceOptions.PASSWORD),
                                config.get(MongoDBSourceOptions.SCHEME),
                                config.get(MongoDBSourceOptions.HOSTS),
                                config.get(MongoDBSourceOptions.CONNECTION_OPTIONS))));

        MongoClientSettings settings = settingsBuilder.build();
        Double samplePercent = config.get(MONGO_CDC_CREATE_SAMPLE_PERCENT);
        Long minSampleSize = config.get(MONGO_CDC_MIN_SAMPLE_SIZE);
        Long maxSampleSize = config.get(MONGO_CDC_MAX_SAMPLE_SIZE);
        try (MongoClient mongoClient = MongoClients.create(settings)) {
            MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName);
            MongoIterable<String> collectionNames = mongoDatabase.listCollectionNames();
            for (String collectionName : collectionNames) {
                if (!isSyncNeeded(collectionName)) {
                    continue;
                }
                MongoCollection<Document> collection = mongoDatabase.getCollection(collectionName);
                Document firstDocument = collection.find().first();
                if (firstDocument == null) {
                    throw new IllegalStateException("No documents in collection to infer schema");
                }

                long totalDocuments = collection.estimatedDocumentCount();
                long sampleSize =
                        calculateSampleSize(
                                totalDocuments, samplePercent, minSampleSize, maxSampleSize);

                ArrayList<Document> documents = sampleData(collection, sampleSize);
                MongoDBSchema mongoDBSchema =
                        new MongoDBSchema(documents, databaseName, collectionName, null);
                mongoDBSchema.setModel(DataModel.UNIQUE);
                schemaList.add(mongoDBSchema);
            }
        }

        return schemaList;
    }