public SourceFunction build()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSource.java [202:311]


        public SourceFunction<T> build() {
            checkNotNull(username, "username shouldn't be null");
            checkNotNull(password, "password shouldn't be null");
            checkNotNull(hostname, "hostname shouldn't be null");
            checkNotNull(port, "port shouldn't be null");

            if (startupOptions == null) {
                startupOptions = StartupOptions.initial();
            }
            if (compatibleMode == null) {
                compatibleMode = "mysql";
            }
            if (jdbcDriver == null) {
                jdbcDriver = "com.mysql.cj.jdbc.Driver";
            }

            if (connectTimeout == null) {
                connectTimeout = Duration.ofSeconds(30);
            }

            if (serverTimeZone == null) {
                serverTimeZone = ZoneId.systemDefault().getId();
            }

            switch (startupOptions.startupMode) {
                case SNAPSHOT:
                    break;
                case INITIAL:
                case LATEST_OFFSET:
                    startupTimestamp = 0L;
                    break;
                case TIMESTAMP:
                    checkNotNull(
                            startupTimestamp,
                            "startupTimestamp shouldn't be null on startup mode 'timestamp'");
                    break;
                default:
                    throw new UnsupportedOperationException(
                            startupOptions.startupMode + " mode is not supported.");
            }

            if (StringUtils.isNotEmpty(databaseName) || StringUtils.isNotEmpty(tableName)) {
                if (StringUtils.isEmpty(databaseName) || StringUtils.isEmpty(tableName)) {
                    throw new IllegalArgumentException(
                            "'database-name' and 'table-name' should be configured at the same time");
                }
            } else {
                checkNotNull(
                        tableList,
                        "'database-name', 'table-name' or 'table-list' should be configured");
            }

            ClientConf clientConf = null;
            ObReaderConfig obReaderConfig = null;

            if (!startupOptions.isSnapshotOnly()) {

                checkNotNull(logProxyHost);
                checkNotNull(logProxyPort);
                checkNotNull(tenantName);

                obReaderConfig = new ObReaderConfig();
                if (StringUtils.isNotEmpty(rsList)) {
                    obReaderConfig.setRsList(rsList);
                }
                if (StringUtils.isNotEmpty(configUrl)) {
                    obReaderConfig.setClusterUrl(configUrl);
                }
                if (StringUtils.isNotEmpty(workingMode)) {
                    obReaderConfig.setWorkingMode(workingMode);
                }
                obReaderConfig.setUsername(username);
                obReaderConfig.setPassword(password);
                obReaderConfig.setStartTimestamp(startupTimestamp);
                obReaderConfig.setTimezone(
                        DateTimeFormatter.ofPattern("xxx")
                                .format(
                                        ZoneId.of(serverTimeZone)
                                                .getRules()
                                                .getOffset(Instant.now())));

                if (obcdcProperties != null && !obcdcProperties.isEmpty()) {
                    Map<String, String> extraConfigs = new HashMap<>();
                    obcdcProperties.forEach((k, v) -> extraConfigs.put(k.toString(), v.toString()));
                    obReaderConfig.setExtraConfigs(extraConfigs);
                }
            }

            return new OceanBaseRichSourceFunction<>(
                    startupOptions,
                    username,
                    password,
                    tenantName,
                    databaseName,
                    tableName,
                    tableList,
                    serverTimeZone,
                    connectTimeout,
                    hostname,
                    port,
                    compatibleMode,
                    jdbcDriver,
                    jdbcProperties,
                    logProxyHost,
                    logProxyPort,
                    logProxyClientId,
                    obReaderConfig,
                    debeziumProperties,
                    deserializer);
        }