in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java [75:177]
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
final ReadableConfig config = helper.getOptions();
String scheme = config.get(SCHEME);
String hosts = config.get(HOSTS);
String connectionOptions = config.getOptional(CONNECTION_OPTIONS).orElse(null);
String username = config.getOptional(USERNAME).orElse(null);
String password = config.getOptional(PASSWORD).orElse(null);
String database = config.getOptional(DATABASE).orElse(null);
String collection = config.getOptional(COLLECTION).orElse(null);
Integer batchSize = config.get(BATCH_SIZE);
Integer pollMaxBatchSize = config.get(POLL_MAX_BATCH_SIZE);
Integer pollAwaitTimeMillis = config.get(POLL_AWAIT_TIME_MILLIS);
Integer heartbeatIntervalMillis = config.get(HEARTBEAT_INTERVAL_MILLIS);
StartupOptions startupOptions = getStartupOptions(config);
Integer initialSnapshottingQueueSize =
config.getOptional(INITIAL_SNAPSHOTTING_QUEUE_SIZE).orElse(null);
Integer initialSnapshottingMaxThreads =
config.getOptional(INITIAL_SNAPSHOTTING_MAX_THREADS).orElse(null);
String initialSnapshottingPipeline =
config.getOptional(INITIAL_SNAPSHOTTING_PIPELINE).orElse(null);
String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE);
ZoneId localTimeZone =
TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zoneId)
? ZoneId.systemDefault()
: ZoneId.of(zoneId);
boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
// The initial.snapshotting.pipeline related config is only used in Debezium mode and
// cannot be used in incremental snapshot mode because the semantic is inconsistent.
// The reason is that in snapshot phase of incremental snapshot mode, the oplog
// will be backfilled after each snapshot to compensate for changes, but the pipeline
// operations in initial.snapshotting.pipeline are not applied to the backfill oplog,
// which means the semantic of this config is inconsistent.
checkArgument(
!(enableParallelRead
&& (initialSnapshottingPipeline != null
|| initialSnapshottingMaxThreads != null
|| initialSnapshottingQueueSize != null)),
"The initial.snapshotting.*/copy.existing.* config only applies to Debezium mode, "
+ "not incremental snapshot mode");
boolean enableCloseIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
boolean assignUnboundedChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
int splitSizeMB = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB);
int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
int samplesPerChunk = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SAMPLES);
boolean enableFullDocumentPrePostImage =
config.getOptional(FULL_DOCUMENT_PRE_POST_IMAGE).orElse(false);
boolean noCursorTimeout = config.getOptional(SCAN_NO_CURSOR_TIMEOUT).orElse(true);
ResolvedSchema physicalSchema =
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present");
checkPrimaryKey(physicalSchema.getPrimaryKey().get(), "Primary key must be _id field");
OptionUtils.printOptions(IDENTIFIER, ((Configuration) config).toMap());
return new MongoDBTableSource(
physicalSchema,
scheme,
hosts,
username,
password,
database,
collection,
connectionOptions,
startupOptions,
initialSnapshottingQueueSize,
initialSnapshottingMaxThreads,
initialSnapshottingPipeline,
batchSize,
pollMaxBatchSize,
pollAwaitTimeMillis,
heartbeatIntervalMillis,
localTimeZone,
enableParallelRead,
splitMetaGroupSize,
splitSizeMB,
samplesPerChunk,
enableCloseIdleReaders,
enableFullDocumentPrePostImage,
noCursorTimeout,
skipSnapshotBackfill,
scanNewlyAddedTableEnabled,
assignUnboundedChunkFirst);
}