public ScanRuntimeProvider getScanRuntimeProvider()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java [174:264]


    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
        RowType physicalDataType =
                (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
        MetadataConverter[] metadataConverters = getMetadataConverters();
        TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);

        DebeziumDeserializationSchema<RowData> deserializer =
                enableFullDocPrePostImage
                        ? new MongoDBConnectorFullChangelogDeserializationSchema(
                                physicalDataType, metadataConverters, typeInfo, localTimeZone)
                        : new MongoDBConnectorDeserializationSchema(
                                physicalDataType, metadataConverters, typeInfo, localTimeZone);

        String databaseList = null;
        String collectionList = null;
        if (StringUtils.isNotEmpty(database) && StringUtils.isNotEmpty(collection)) {
            // explicitly specified database and collection.
            if (!inferIsRegularExpression(database) && !inferIsRegularExpression(collection)) {
                checkDatabaseNameValidity(database);
                checkCollectionNameValidity(collection);
                databaseList = database;
                collectionList = database + "." + collection;
            } else {
                databaseList = database;
                collectionList = collection;
            }
        } else if (StringUtils.isNotEmpty(database)) {
            databaseList = database;
        } else if (StringUtils.isNotEmpty(collection)) {
            collectionList = collection;
        } else {
            // Watching all changes on the cluster by default, we do nothing here
        }

        if (enableParallelRead) {
            MongoDBSourceBuilder<RowData> builder =
                    MongoDBSource.<RowData>builder()
                            .scheme(scheme)
                            .hosts(hosts)
                            .closeIdleReaders(closeIdlerReaders)
                            .scanFullChangelog(enableFullDocPrePostImage)
                            .startupOptions(startupOptions)
                            .skipSnapshotBackfill(skipSnapshotBackfill)
                            .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
                            .deserializer(deserializer)
                            .disableCursorTimeout(noCursorTimeout)
                            .assignUnboundedChunkFirst(assignUnboundedChunkFirst);

            Optional.ofNullable(databaseList).ifPresent(builder::databaseList);
            Optional.ofNullable(collectionList).ifPresent(builder::collectionList);
            Optional.ofNullable(username).ifPresent(builder::username);
            Optional.ofNullable(password).ifPresent(builder::password);
            Optional.ofNullable(connectionOptions).ifPresent(builder::connectionOptions);
            Optional.ofNullable(batchSize).ifPresent(builder::batchSize);
            Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
            Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis);
            Optional.ofNullable(heartbeatIntervalMillis)
                    .ifPresent(builder::heartbeatIntervalMillis);
            Optional.ofNullable(splitMetaGroupSize).ifPresent(builder::splitMetaGroupSize);
            Optional.ofNullable(splitSizeMB).ifPresent(builder::splitSizeMB);
            Optional.ofNullable(samplesPerChunk).ifPresent(builder::samplesPerChunk);
            return SourceProvider.of(builder.build());
        } else {
            org.apache.flink.cdc.connectors.mongodb.MongoDBSource.Builder<RowData> builder =
                    org.apache.flink.cdc.connectors.mongodb.MongoDBSource.<RowData>builder()
                            .scheme(scheme)
                            .hosts(hosts)
                            .scanFullChangelog(enableFullDocPrePostImage)
                            .startupOptions(startupOptions)
                            .deserializer(deserializer);

            Optional.ofNullable(databaseList).ifPresent(builder::databaseList);
            Optional.ofNullable(collectionList).ifPresent(builder::collectionList);
            Optional.ofNullable(username).ifPresent(builder::username);
            Optional.ofNullable(password).ifPresent(builder::password);
            Optional.ofNullable(connectionOptions).ifPresent(builder::connectionOptions);
            Optional.ofNullable(initialSnapshottingQueueSize)
                    .ifPresent(builder::initialSnapshottingQueueSize);
            Optional.ofNullable(initialSnapshottingMaxThreads)
                    .ifPresent(builder::initialSnapshottingMaxThreads);
            Optional.ofNullable(initialSnapshottingPipeline)
                    .ifPresent(builder::initialSnapshottingPipeline);
            Optional.ofNullable(batchSize).ifPresent(builder::batchSize);
            Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
            Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis);
            Optional.ofNullable(heartbeatIntervalMillis)
                    .ifPresent(builder::heartbeatIntervalMillis);

            return SourceFunctionProvider.of(builder.build(), false);
        }
    }