in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java [71:97]
public List<SourceSchema> getSchemaList() throws Exception {
String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME);
List<SourceSchema> schemaList = new ArrayList<>();
try (Connection conn = getConnection()) {
DatabaseMetaData metaData = conn.getMetaData();
try (ResultSet tables =
metaData.getTables(databaseName, null, "%", new String[]{"TABLE"})) {
while (tables.next()) {
String tableName = tables.getString("TABLE_NAME");
String tableComment = tables.getString("REMARKS");
if (!isSyncNeeded(tableName)) {
continue;
}
SourceSchema sourceSchema =
new MysqlSchema(metaData, databaseName, tableName, tableComment);
if (sourceSchema.primaryKeys.size() > 0) {
//Only sync tables with primary keys
schemaList.add(sourceSchema);
} else {
LOG.warn("table {} has no primary key, skip", tableName);
System.out.println("table " + tableName + " has no primary key, skip.");
}
}
}
}
return schemaList;
}