public DynamicTableSource createDynamicTableSource()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java [75:177]


    public DynamicTableSource createDynamicTableSource(Context context) {
        final FactoryUtil.TableFactoryHelper helper =
                FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();

        final ReadableConfig config = helper.getOptions();

        String scheme = config.get(SCHEME);
        String hosts = config.get(HOSTS);
        String connectionOptions = config.getOptional(CONNECTION_OPTIONS).orElse(null);

        String username = config.getOptional(USERNAME).orElse(null);
        String password = config.getOptional(PASSWORD).orElse(null);

        String database = config.getOptional(DATABASE).orElse(null);
        String collection = config.getOptional(COLLECTION).orElse(null);

        Integer batchSize = config.get(BATCH_SIZE);
        Integer pollMaxBatchSize = config.get(POLL_MAX_BATCH_SIZE);
        Integer pollAwaitTimeMillis = config.get(POLL_AWAIT_TIME_MILLIS);

        Integer heartbeatIntervalMillis = config.get(HEARTBEAT_INTERVAL_MILLIS);

        StartupOptions startupOptions = getStartupOptions(config);
        Integer initialSnapshottingQueueSize =
                config.getOptional(INITIAL_SNAPSHOTTING_QUEUE_SIZE).orElse(null);
        Integer initialSnapshottingMaxThreads =
                config.getOptional(INITIAL_SNAPSHOTTING_MAX_THREADS).orElse(null);
        String initialSnapshottingPipeline =
                config.getOptional(INITIAL_SNAPSHOTTING_PIPELINE).orElse(null);

        String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE);
        ZoneId localTimeZone =
                TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zoneId)
                        ? ZoneId.systemDefault()
                        : ZoneId.of(zoneId);

        boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);

        // The initial.snapshotting.pipeline related config is only used in Debezium mode and
        // cannot be used in incremental snapshot mode because the semantic is inconsistent.
        // The reason is that in snapshot phase of incremental snapshot mode, the oplog
        // will be backfilled after each snapshot to compensate for changes, but the pipeline
        // operations in initial.snapshotting.pipeline are not applied to the backfill oplog,
        // which means the semantic of this config is inconsistent.
        checkArgument(
                !(enableParallelRead
                        && (initialSnapshottingPipeline != null
                                || initialSnapshottingMaxThreads != null
                                || initialSnapshottingQueueSize != null)),
                "The initial.snapshotting.*/copy.existing.* config only applies to Debezium mode, "
                        + "not incremental snapshot mode");

        boolean enableCloseIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
        boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
        boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
        boolean assignUnboundedChunkFirst =
                config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);

        int splitSizeMB = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB);
        int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
        int samplesPerChunk = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SAMPLES);

        boolean enableFullDocumentPrePostImage =
                config.getOptional(FULL_DOCUMENT_PRE_POST_IMAGE).orElse(false);

        boolean noCursorTimeout = config.getOptional(SCAN_NO_CURSOR_TIMEOUT).orElse(true);
        ResolvedSchema physicalSchema =
                getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
        checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present");
        checkPrimaryKey(physicalSchema.getPrimaryKey().get(), "Primary key must be _id field");

        OptionUtils.printOptions(IDENTIFIER, ((Configuration) config).toMap());

        return new MongoDBTableSource(
                physicalSchema,
                scheme,
                hosts,
                username,
                password,
                database,
                collection,
                connectionOptions,
                startupOptions,
                initialSnapshottingQueueSize,
                initialSnapshottingMaxThreads,
                initialSnapshottingPipeline,
                batchSize,
                pollMaxBatchSize,
                pollAwaitTimeMillis,
                heartbeatIntervalMillis,
                localTimeZone,
                enableParallelRead,
                splitMetaGroupSize,
                splitSizeMB,
                samplesPerChunk,
                enableCloseIdleReaders,
                enableFullDocumentPrePostImage,
                noCursorTimeout,
                skipSnapshotBackfill,
                scanNewlyAddedTableEnabled,
                assignUnboundedChunkFirst);
    }