in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java [134:218]
public void build(StreamExecutionEnvironment env) throws Exception {
checkArgument(
mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME),
String.format(
"mysql-conf [%s] must be specified.", MySqlSourceOptions.TABLE_NAME.key()));
String tableList =
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)
+ "\\."
+ mySqlConfig.get(MySqlSourceOptions.TABLE_NAME);
MySqlSource<String> source = MySqlActionUtils.buildMySqlSource(mySqlConfig, tableList);
boolean caseSensitive = catalog.caseSensitive();
if (!caseSensitive) {
validateCaseInsensitive();
}
MySqlSchema mySqlSchema =
MySqlActionUtils.getMySqlSchemaList(
mySqlConfig, monitorTablePredication(), new ArrayList<>())
.stream()
.reduce(MySqlSchema::merge)
.orElseThrow(
() ->
new RuntimeException(
"No table satisfies the given database name and table name"));
catalog.createDatabase(database, true);
Identifier identifier = new Identifier(database, table);
FileStoreTable table;
List<ComputedColumn> computedColumns =
buildComputedColumns(computedColumnArgs, mySqlSchema.typeMapping());
Schema fromMySql =
MySqlActionUtils.buildPaimonSchema(
mySqlSchema,
partitionKeys,
primaryKeys,
computedColumns,
tableConfig,
caseSensitive);
try {
table = (FileStoreTable) catalog.getTable(identifier);
if (computedColumns.size() > 0) {
List<String> computedFields =
computedColumns.stream()
.map(ComputedColumn::columnName)
.collect(Collectors.toList());
List<String> fieldNames = table.schema().fieldNames();
checkArgument(
new HashSet<>(fieldNames).containsAll(computedFields),
" Exists Table should contain all computed columns %s, but are %s.",
computedFields,
fieldNames);
}
MySqlActionUtils.assertSchemaCompatible(table.schema(), fromMySql);
} catch (Catalog.TableNotExistException e) {
catalog.createTable(identifier, fromMySql, false);
table = (FileStoreTable) catalog.getTable(identifier);
}
String serverTimeZone = mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : ZoneId.of(serverTimeZone);
Boolean convertTinyint1ToBool = mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
EventParser.Factory<String> parserFactory =
() ->
new MySqlDebeziumJsonEventParser(
zoneId, caseSensitive, computedColumns, convertTinyint1ToBool);
CdcSinkBuilder<String> sinkBuilder =
new CdcSinkBuilder<String>()
.withInput(
env.fromSource(
source, WatermarkStrategy.noWatermarks(), "MySQL Source"))
.withParserFactory(parserFactory)
.withTable(table)
.withIdentifier(identifier)
.withCatalogLoader(catalogLoader());
String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
if (sinkParallelism != null) {
sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
}
sinkBuilder.build();
}