in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java [79:169]
public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX);
final ReadableConfig config = helper.getOptions();
String hostname = config.get(HOSTNAME);
String username = config.get(USERNAME);
String password = config.get(PASSWORD);
String databaseName = config.get(DATABASE_NAME);
String schemaName = config.get(SCHEMA_NAME);
String tableName = config.get(TABLE_NAME);
int port = config.get(PG_PORT);
String pluginName = config.get(DECODING_PLUGIN_NAME);
String slotName = config.get(SLOT_NAME);
DebeziumChangelogMode changelogMode = config.get(CHANGELOG_MODE);
ResolvedSchema physicalSchema =
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
if (changelogMode == DebeziumChangelogMode.UPSERT) {
checkArgument(
physicalSchema.getPrimaryKey().isPresent(),
"Primary key must be present when upsert mode is selected.");
}
boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
StartupOptions startupOptions = getStartupOptions(config);
int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
double distributionFactorUpper = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
String chunkKeyColumn =
config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null);
boolean closeIdlerReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean isScanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
boolean assignUnboundedChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
if (enableParallelRead) {
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
validateIntegerOption(JdbcSourceOptions.CONNECTION_POOL_SIZE, connectionPoolSize, 1);
validateIntegerOption(JdbcSourceOptions.CONNECT_MAX_RETRIES, connectMaxRetries, 0);
validateDistributionFactorUpper(distributionFactorUpper);
validateDistributionFactorLower(distributionFactorLower);
} else {
checkState(
!StartupMode.LATEST_OFFSET.equals(startupOptions.startupMode),
"The Postgres CDC connector does not support 'latest-offset' startup mode when 'scan.incremental.snapshot.enabled' is disabled, you can enable 'scan.incremental.snapshot.enabled' to use this startup mode.");
}
OptionUtils.printOptions(IDENTIFIER, ((Configuration) config).toMap());
return new PostgreSQLTableSource(
physicalSchema,
port,
hostname,
databaseName,
schemaName,
tableName,
username,
password,
pluginName,
slotName,
changelogMode,
getDebeziumProperties(context.getCatalogTable().getOptions()),
enableParallelRead,
splitSize,
splitMetaGroupSize,
fetchSize,
connectTimeout,
connectMaxRetries,
connectionPoolSize,
distributionFactorUpper,
distributionFactorLower,
heartbeatInterval,
startupOptions,
chunkKeyColumn,
closeIdlerReaders,
skipSnapshotBackfill,
isScanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay,
assignUnboundedChunkFirst);
}