in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java [127:194]
private static void syncDatabase(
MultipleParameterTool params,
DatabaseSync databaseSync,
Configuration config,
SourceConnector sourceConnector)
throws Exception {
String jobName = params.get(DatabaseSyncConfig.JOB_NAME);
String database = params.get(DatabaseSyncConfig.DATABASE);
String tablePrefix = params.get(DatabaseSyncConfig.TABLE_PREFIX);
String tableSuffix = params.get(DatabaseSyncConfig.TABLE_SUFFIX);
String includingTables = params.get(DatabaseSyncConfig.INCLUDING_TABLES);
String excludingTables = params.get(DatabaseSyncConfig.EXCLUDING_TABLES);
String multiToOneOrigin = params.get(DatabaseSyncConfig.MULTI_TO_ONE_ORIGIN);
String multiToOneTarget = params.get(DatabaseSyncConfig.MULTI_TO_ONE_TARGET);
String schemaChangeMode = params.get(DatabaseSyncConfig.SCHEMA_CHANGE_MODE);
boolean createTableOnly = params.has(DatabaseSyncConfig.CREATE_TABLE_ONLY);
boolean ignoreDefaultValue = params.has(DatabaseSyncConfig.IGNORE_DEFAULT_VALUE);
boolean ignoreIncompatible = params.has(DatabaseSyncConfig.IGNORE_INCOMPATIBLE);
boolean singleSink = params.has(DatabaseSyncConfig.SINGLE_SINK);
Preconditions.checkArgument(params.has(DatabaseSyncConfig.SINK_CONF));
Map<String, String> sinkMap = getConfigMap(params, DatabaseSyncConfig.SINK_CONF);
DorisTableConfig tableConfig =
new DorisTableConfig(getConfigMap(params, DatabaseSyncConfig.TABLE_CONF));
Configuration sinkConfig = Configuration.fromMap(sinkMap);
StreamExecutionEnvironment env =
Objects.nonNull(flinkEnvironmentForTesting)
? flinkEnvironmentForTesting
: StreamExecutionEnvironment.getExecutionEnvironment();
databaseSync
.setEnv(env)
.setDatabase(database)
.setConfig(config)
.setTablePrefix(tablePrefix)
.setTableSuffix(tableSuffix)
.setIncludingTables(includingTables)
.setExcludingTables(excludingTables)
.setMultiToOneOrigin(multiToOneOrigin)
.setMultiToOneTarget(multiToOneTarget)
.setIgnoreDefaultValue(ignoreDefaultValue)
.setSinkConfig(sinkConfig)
.setTableConfig(tableConfig)
.setCreateTableOnly(createTableOnly)
.setSingleSink(singleSink)
.setIgnoreIncompatible(ignoreIncompatible)
.setSchemaChangeMode(schemaChangeMode)
.create();
boolean needExecute = databaseSync.build();
if (!needExecute) {
// create table only
return;
}
if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
jobName =
String.format(
"%s-Doris Sync Database: %s",
sourceConnector.getConnectorName(),
config.getString(
DatabaseSyncConfig.DATABASE_NAME, DatabaseSyncConfig.DB));
}
if (Objects.nonNull(flinkEnvironmentForTesting)) {
jobClient = env.executeAsync();
} else {
env.execute(jobName);
}
}