public void configure()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java [122:267]


    public void configure(SourceSplitBase sourceSplitBase) {
        LOG.debug("Configuring PostgresSourceFetchTaskContext for split: {}", sourceSplitBase);
        PostgresConnectorConfig dbzConfig = getDbzConnectorConfig();
        if (sourceSplitBase instanceof SnapshotSplit) {
            dbzConfig =
                    new PostgresConnectorConfig(
                            dbzConfig
                                    .getConfig()
                                    .edit()
                                    .with(
                                            "table.include.list",
                                            ((SnapshotSplit) sourceSplitBase)
                                                    .getTableId()
                                                    .toString())
                                    .with(
                                            SLOT_NAME.name(),
                                            ((PostgresSourceConfig) sourceConfig)
                                                    .getSlotNameForBackfillTask())
                                    // drop slot for backfill stream split
                                    .with(DROP_SLOT_ON_STOP.name(), true)
                                    // Disable heartbeat event in snapshot split fetcher
                                    .with(Heartbeat.HEARTBEAT_INTERVAL, 0)
                                    .build());
        } else {
            dbzConfig =
                    new PostgresConnectorConfig(
                            dbzConfig
                                    .getConfig()
                                    .edit()
                                    // never drop slot for stream split, which is also global split
                                    .with(DROP_SLOT_ON_STOP.name(), false)
                                    .build());
        }

        LOG.info("PostgresConnectorConfig is ", dbzConfig.getConfig().asProperties().toString());
        setDbzConnectorConfig(dbzConfig);
        PostgresConnectorConfig.SnapshotMode snapshotMode =
                PostgresConnectorConfig.SnapshotMode.parse(
                        dbzConfig.getConfig().getString(SNAPSHOT_MODE));
        this.snapShotter = snapshotMode.getSnapshotter(dbzConfig.getConfig());

        PostgresConnection.PostgresValueConverterBuilder valueConverterBuilder =
                newPostgresValueConverterBuilder(dbzConfig);
        this.jdbcConnection =
                new PostgresConnection(
                        dbzConfig.getJdbcConfig(), valueConverterBuilder, CONNECTION_NAME);

        TopicSelector<TableId> topicSelector = PostgresTopicSelector.create(dbzConfig);
        EmbeddedFlinkDatabaseHistory.registerHistory(
                sourceConfig
                        .getDbzConfiguration()
                        .getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
                sourceSplitBase.getTableSchemas().values());

        try {
            this.schema =
                    PostgresObjectUtils.newSchema(
                            jdbcConnection,
                            dbzConfig,
                            jdbcConnection.getTypeRegistry(),
                            topicSelector,
                            valueConverterBuilder.build(jdbcConnection.getTypeRegistry()));
        } catch (SQLException e) {
            throw new RuntimeException("Failed to initialize PostgresSchema", e);
        }

        this.offsetContext =
                loadStartingOffsetState(
                        new PostgresOffsetContext.Loader(dbzConfig), sourceSplitBase);
        this.partition = new PostgresPartition(dbzConfig.getLogicalName());
        this.taskContext = PostgresObjectUtils.newTaskContext(dbzConfig, schema, topicSelector);

        if (replicationConnection == null) {
            replicationConnection =
                    createReplicationConnection(
                            this.taskContext,
                            jdbcConnection,
                            this.snapShotter.shouldSnapshot(),
                            dbzConfig);
        }

        this.queue =
                new ChangeEventQueue.Builder<DataChangeEvent>()
                        .pollInterval(dbzConfig.getPollInterval())
                        .maxBatchSize(dbzConfig.getMaxBatchSize())
                        .maxQueueSize(dbzConfig.getMaxQueueSize())
                        .maxQueueSizeInBytes(dbzConfig.getMaxQueueSizeInBytes())
                        .loggingContextSupplier(
                                () ->
                                        taskContext.configureLoggingContext(
                                                "postgres-cdc-connector-task"))
                        // do not buffer any element, we use signal event
                        // .buffering()
                        .build();

        this.errorHandler = new PostgresErrorHandler(getDbzConnectorConfig(), queue);
        this.metadataProvider = PostgresObjectUtils.newEventMetadataProvider();

        PostgresConnectorConfig finalDbzConfig = dbzConfig;
        this.postgresDispatcher =
                new CDCPostgresDispatcher(
                        finalDbzConfig,
                        topicSelector,
                        schema,
                        queue,
                        finalDbzConfig.getTableFilters().dataCollectionFilter(),
                        DataChangeEvent::new,
                        metadataProvider,
                        new HeartbeatFactory<>(
                                dbzConfig,
                                topicSelector,
                                schemaNameAdjuster,
                                () ->
                                        new PostgresConnection(
                                                finalDbzConfig.getJdbcConfig(),
                                                PostgresConnection.CONNECTION_GENERAL),
                                exception -> {
                                    String sqlErrorId = exception.getSQLState();
                                    switch (sqlErrorId) {
                                        case "57P01":
                                            // Postgres error admin_shutdown, see
                                            // https://www.postgresql.org/docs/12/errcodes-appendix.html
                                            throw new DebeziumException(
                                                    "Could not execute heartbeat action query (Error: "
                                                            + sqlErrorId
                                                            + ")",
                                                    exception);
                                        case "57P03":
                                            // Postgres error cannot_connect_now, see
                                            // https://www.postgresql.org/docs/12/errcodes-appendix.html
                                            throw new RetriableException(
                                                    "Could not execute heartbeat action query (Error: "
                                                            + sqlErrorId
                                                            + ")",
                                                    exception);
                                        default:
                                            break;
                                    }
                                }),
                        schemaNameAdjuster);

        ChangeEventSourceMetricsFactory<PostgresPartition> metricsFactory =
                new DefaultChangeEventSourceMetricsFactory<>();
        this.snapshotChangeEventSourceMetrics =
                metricsFactory.getSnapshotMetrics(taskContext, queue, metadataProvider);
    }