in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java [174:264]
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters();
TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
DebeziumDeserializationSchema<RowData> deserializer =
enableFullDocPrePostImage
? new MongoDBConnectorFullChangelogDeserializationSchema(
physicalDataType, metadataConverters, typeInfo, localTimeZone)
: new MongoDBConnectorDeserializationSchema(
physicalDataType, metadataConverters, typeInfo, localTimeZone);
String databaseList = null;
String collectionList = null;
if (StringUtils.isNotEmpty(database) && StringUtils.isNotEmpty(collection)) {
// explicitly specified database and collection.
if (!inferIsRegularExpression(database) && !inferIsRegularExpression(collection)) {
checkDatabaseNameValidity(database);
checkCollectionNameValidity(collection);
databaseList = database;
collectionList = database + "." + collection;
} else {
databaseList = database;
collectionList = collection;
}
} else if (StringUtils.isNotEmpty(database)) {
databaseList = database;
} else if (StringUtils.isNotEmpty(collection)) {
collectionList = collection;
} else {
// Watching all changes on the cluster by default, we do nothing here
}
if (enableParallelRead) {
MongoDBSourceBuilder<RowData> builder =
MongoDBSource.<RowData>builder()
.scheme(scheme)
.hosts(hosts)
.closeIdleReaders(closeIdlerReaders)
.scanFullChangelog(enableFullDocPrePostImage)
.startupOptions(startupOptions)
.skipSnapshotBackfill(skipSnapshotBackfill)
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.deserializer(deserializer)
.disableCursorTimeout(noCursorTimeout)
.assignUnboundedChunkFirst(assignUnboundedChunkFirst);
Optional.ofNullable(databaseList).ifPresent(builder::databaseList);
Optional.ofNullable(collectionList).ifPresent(builder::collectionList);
Optional.ofNullable(username).ifPresent(builder::username);
Optional.ofNullable(password).ifPresent(builder::password);
Optional.ofNullable(connectionOptions).ifPresent(builder::connectionOptions);
Optional.ofNullable(batchSize).ifPresent(builder::batchSize);
Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis);
Optional.ofNullable(heartbeatIntervalMillis)
.ifPresent(builder::heartbeatIntervalMillis);
Optional.ofNullable(splitMetaGroupSize).ifPresent(builder::splitMetaGroupSize);
Optional.ofNullable(splitSizeMB).ifPresent(builder::splitSizeMB);
Optional.ofNullable(samplesPerChunk).ifPresent(builder::samplesPerChunk);
return SourceProvider.of(builder.build());
} else {
org.apache.flink.cdc.connectors.mongodb.MongoDBSource.Builder<RowData> builder =
org.apache.flink.cdc.connectors.mongodb.MongoDBSource.<RowData>builder()
.scheme(scheme)
.hosts(hosts)
.scanFullChangelog(enableFullDocPrePostImage)
.startupOptions(startupOptions)
.deserializer(deserializer);
Optional.ofNullable(databaseList).ifPresent(builder::databaseList);
Optional.ofNullable(collectionList).ifPresent(builder::collectionList);
Optional.ofNullable(username).ifPresent(builder::username);
Optional.ofNullable(password).ifPresent(builder::password);
Optional.ofNullable(connectionOptions).ifPresent(builder::connectionOptions);
Optional.ofNullable(initialSnapshottingQueueSize)
.ifPresent(builder::initialSnapshottingQueueSize);
Optional.ofNullable(initialSnapshottingMaxThreads)
.ifPresent(builder::initialSnapshottingMaxThreads);
Optional.ofNullable(initialSnapshottingPipeline)
.ifPresent(builder::initialSnapshottingPipeline);
Optional.ofNullable(batchSize).ifPresent(builder::batchSize);
Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis);
Optional.ofNullable(heartbeatIntervalMillis)
.ifPresent(builder::heartbeatIntervalMillis);
return SourceFunctionProvider.of(builder.build(), false);
}
}