in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSource.java [202:311]
public SourceFunction<T> build() {
checkNotNull(username, "username shouldn't be null");
checkNotNull(password, "password shouldn't be null");
checkNotNull(hostname, "hostname shouldn't be null");
checkNotNull(port, "port shouldn't be null");
if (startupOptions == null) {
startupOptions = StartupOptions.initial();
}
if (compatibleMode == null) {
compatibleMode = "mysql";
}
if (jdbcDriver == null) {
jdbcDriver = "com.mysql.cj.jdbc.Driver";
}
if (connectTimeout == null) {
connectTimeout = Duration.ofSeconds(30);
}
if (serverTimeZone == null) {
serverTimeZone = ZoneId.systemDefault().getId();
}
switch (startupOptions.startupMode) {
case SNAPSHOT:
break;
case INITIAL:
case LATEST_OFFSET:
startupTimestamp = 0L;
break;
case TIMESTAMP:
checkNotNull(
startupTimestamp,
"startupTimestamp shouldn't be null on startup mode 'timestamp'");
break;
default:
throw new UnsupportedOperationException(
startupOptions.startupMode + " mode is not supported.");
}
if (StringUtils.isNotEmpty(databaseName) || StringUtils.isNotEmpty(tableName)) {
if (StringUtils.isEmpty(databaseName) || StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException(
"'database-name' and 'table-name' should be configured at the same time");
}
} else {
checkNotNull(
tableList,
"'database-name', 'table-name' or 'table-list' should be configured");
}
ClientConf clientConf = null;
ObReaderConfig obReaderConfig = null;
if (!startupOptions.isSnapshotOnly()) {
checkNotNull(logProxyHost);
checkNotNull(logProxyPort);
checkNotNull(tenantName);
obReaderConfig = new ObReaderConfig();
if (StringUtils.isNotEmpty(rsList)) {
obReaderConfig.setRsList(rsList);
}
if (StringUtils.isNotEmpty(configUrl)) {
obReaderConfig.setClusterUrl(configUrl);
}
if (StringUtils.isNotEmpty(workingMode)) {
obReaderConfig.setWorkingMode(workingMode);
}
obReaderConfig.setUsername(username);
obReaderConfig.setPassword(password);
obReaderConfig.setStartTimestamp(startupTimestamp);
obReaderConfig.setTimezone(
DateTimeFormatter.ofPattern("xxx")
.format(
ZoneId.of(serverTimeZone)
.getRules()
.getOffset(Instant.now())));
if (obcdcProperties != null && !obcdcProperties.isEmpty()) {
Map<String, String> extraConfigs = new HashMap<>();
obcdcProperties.forEach((k, v) -> extraConfigs.put(k.toString(), v.toString()));
obReaderConfig.setExtraConfigs(extraConfigs);
}
}
return new OceanBaseRichSourceFunction<>(
startupOptions,
username,
password,
tenantName,
databaseName,
tableName,
tableList,
serverTimeZone,
connectTimeout,
hostname,
port,
compatibleMode,
jdbcDriver,
jdbcProperties,
logProxyHost,
logProxyPort,
logProxyClientId,
obReaderConfig,
debeziumProperties,
deserializer);
}