in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java [122:267]
public void configure(SourceSplitBase sourceSplitBase) {
LOG.debug("Configuring PostgresSourceFetchTaskContext for split: {}", sourceSplitBase);
PostgresConnectorConfig dbzConfig = getDbzConnectorConfig();
if (sourceSplitBase instanceof SnapshotSplit) {
dbzConfig =
new PostgresConnectorConfig(
dbzConfig
.getConfig()
.edit()
.with(
"table.include.list",
((SnapshotSplit) sourceSplitBase)
.getTableId()
.toString())
.with(
SLOT_NAME.name(),
((PostgresSourceConfig) sourceConfig)
.getSlotNameForBackfillTask())
// drop slot for backfill stream split
.with(DROP_SLOT_ON_STOP.name(), true)
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
.build());
} else {
dbzConfig =
new PostgresConnectorConfig(
dbzConfig
.getConfig()
.edit()
// never drop slot for stream split, which is also global split
.with(DROP_SLOT_ON_STOP.name(), false)
.build());
}
LOG.info("PostgresConnectorConfig is ", dbzConfig.getConfig().asProperties().toString());
setDbzConnectorConfig(dbzConfig);
PostgresConnectorConfig.SnapshotMode snapshotMode =
PostgresConnectorConfig.SnapshotMode.parse(
dbzConfig.getConfig().getString(SNAPSHOT_MODE));
this.snapShotter = snapshotMode.getSnapshotter(dbzConfig.getConfig());
PostgresConnection.PostgresValueConverterBuilder valueConverterBuilder =
newPostgresValueConverterBuilder(dbzConfig);
this.jdbcConnection =
new PostgresConnection(
dbzConfig.getJdbcConfig(), valueConverterBuilder, CONNECTION_NAME);
TopicSelector<TableId> topicSelector = PostgresTopicSelector.create(dbzConfig);
EmbeddedFlinkDatabaseHistory.registerHistory(
sourceConfig
.getDbzConfiguration()
.getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
sourceSplitBase.getTableSchemas().values());
try {
this.schema =
PostgresObjectUtils.newSchema(
jdbcConnection,
dbzConfig,
jdbcConnection.getTypeRegistry(),
topicSelector,
valueConverterBuilder.build(jdbcConnection.getTypeRegistry()));
} catch (SQLException e) {
throw new RuntimeException("Failed to initialize PostgresSchema", e);
}
this.offsetContext =
loadStartingOffsetState(
new PostgresOffsetContext.Loader(dbzConfig), sourceSplitBase);
this.partition = new PostgresPartition(dbzConfig.getLogicalName());
this.taskContext = PostgresObjectUtils.newTaskContext(dbzConfig, schema, topicSelector);
if (replicationConnection == null) {
replicationConnection =
createReplicationConnection(
this.taskContext,
jdbcConnection,
this.snapShotter.shouldSnapshot(),
dbzConfig);
}
this.queue =
new ChangeEventQueue.Builder<DataChangeEvent>()
.pollInterval(dbzConfig.getPollInterval())
.maxBatchSize(dbzConfig.getMaxBatchSize())
.maxQueueSize(dbzConfig.getMaxQueueSize())
.maxQueueSizeInBytes(dbzConfig.getMaxQueueSizeInBytes())
.loggingContextSupplier(
() ->
taskContext.configureLoggingContext(
"postgres-cdc-connector-task"))
// do not buffer any element, we use signal event
// .buffering()
.build();
this.errorHandler = new PostgresErrorHandler(getDbzConnectorConfig(), queue);
this.metadataProvider = PostgresObjectUtils.newEventMetadataProvider();
PostgresConnectorConfig finalDbzConfig = dbzConfig;
this.postgresDispatcher =
new CDCPostgresDispatcher(
finalDbzConfig,
topicSelector,
schema,
queue,
finalDbzConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
metadataProvider,
new HeartbeatFactory<>(
dbzConfig,
topicSelector,
schemaNameAdjuster,
() ->
new PostgresConnection(
finalDbzConfig.getJdbcConfig(),
PostgresConnection.CONNECTION_GENERAL),
exception -> {
String sqlErrorId = exception.getSQLState();
switch (sqlErrorId) {
case "57P01":
// Postgres error admin_shutdown, see
// https://www.postgresql.org/docs/12/errcodes-appendix.html
throw new DebeziumException(
"Could not execute heartbeat action query (Error: "
+ sqlErrorId
+ ")",
exception);
case "57P03":
// Postgres error cannot_connect_now, see
// https://www.postgresql.org/docs/12/errcodes-appendix.html
throw new RetriableException(
"Could not execute heartbeat action query (Error: "
+ sqlErrorId
+ ")",
exception);
default:
break;
}
}),
schemaNameAdjuster);
ChangeEventSourceMetricsFactory<PostgresPartition> metricsFactory =
new DefaultChangeEventSourceMetricsFactory<>();
this.snapshotChangeEventSourceMetrics =
metricsFactory.getSnapshotMetrics(taskContext, queue, metadataProvider);
}