in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java [165:269]
public void build(StreamExecutionEnvironment env) throws Exception {
checkArgument(
!mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME),
MySqlSourceOptions.TABLE_NAME.key()
+ " cannot be set for mysql-sync-database. "
+ "If you want to sync several MySQL tables into one Paimon table, "
+ "use mysql-sync-table instead.");
boolean caseSensitive = catalog.caseSensitive();
if (!caseSensitive) {
validateCaseInsensitive();
}
List<Identifier> excludedTables = new ArrayList<>();
List<MySqlSchema> beforeMerging =
MySqlActionUtils.getMySqlSchemaList(
mySqlConfig, monitorTablePredication(), excludedTables);
monitoredTables =
beforeMerging.stream().map(MySqlSchema::identifier).collect(Collectors.toList());
List<MySqlSchema> mySqlSchemas = mergeShards ? mergeShards(beforeMerging) : beforeMerging;
checkArgument(
mySqlSchemas.size() > 0,
"No tables found in MySQL database "
+ mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)
+ ", or MySQL database does not exist.");
catalog.createDatabase(database, true);
TableNameConverter tableNameConverter =
new TableNameConverter(caseSensitive, mergeShards, tablePrefix, tableSuffix);
List<FileStoreTable> fileStoreTables = new ArrayList<>();
for (MySqlSchema mySqlSchema : mySqlSchemas) {
Identifier identifier = buildPaimonIdentifier(tableNameConverter, mySqlSchema);
FileStoreTable table;
Schema fromMySql =
MySqlActionUtils.buildPaimonSchema(
mySqlSchema,
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
tableConfig,
caseSensitive);
try {
table = (FileStoreTable) catalog.getTable(identifier);
Supplier<String> errMsg =
incompatibleMessage(table.schema(), mySqlSchema, identifier);
if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) {
fileStoreTables.add(table);
} else {
unmonitor(mySqlSchema);
}
} catch (Catalog.TableNotExistException e) {
catalog.createTable(identifier, fromMySql, false);
table = (FileStoreTable) catalog.getTable(identifier);
fileStoreTables.add(table);
}
}
Preconditions.checkState(
!monitoredTables.isEmpty(),
"No tables to be synchronized. Possible cause is the schemas of all tables in specified "
+ "MySQL database are not compatible with those of existed Paimon tables. Please check the log.");
MySqlSource<String> source =
MySqlActionUtils.buildMySqlSource(mySqlConfig, buildTableList(excludedTables));
String serverTimeZone = mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : ZoneId.of(serverTimeZone);
MySqlTableSchemaBuilder schemaBuilder =
new MySqlTableSchemaBuilder(tableConfig, caseSensitive);
Pattern includingPattern = this.includingPattern;
Pattern excludingPattern = this.excludingPattern;
Boolean convertTinyint1ToBool = mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
EventParser.Factory<String> parserFactory =
() ->
new MySqlDebeziumJsonEventParser(
zoneId,
caseSensitive,
tableNameConverter,
schemaBuilder,
includingPattern,
excludingPattern,
convertTinyint1ToBool);
String database = this.database;
DatabaseSyncMode mode = this.mode;
FlinkCdcSyncDatabaseSinkBuilder<String> sinkBuilder =
new FlinkCdcSyncDatabaseSinkBuilder<String>()
.withInput(
env.fromSource(
source, WatermarkStrategy.noWatermarks(), "MySQL Source"))
.withParserFactory(parserFactory)
.withDatabase(database)
.withCatalogLoader(catalogLoader())
.withTables(fileStoreTables)
.withMode(mode);
String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
if (sinkParallelism != null) {
sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
}
sinkBuilder.build();
}