in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java [45:67]
public static AbstractJdbcCatalog createCatalog(
ClassLoader userClassLoader,
String catalogName,
String defaultDatabase,
String username,
String pwd,
String baseUrl) {
JdbcDialect dialect = JdbcDialectLoader.load(baseUrl, userClassLoader);
if (dialect instanceof PostgresDialect) {
return new PostgresCatalog(
userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
} else if (dialect instanceof CrateDBDialect) {
return new CrateDBCatalog(
userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
} else if (dialect instanceof MySqlDialect) {
return new MySqlCatalog(
userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
} else {
throw new UnsupportedOperationException(
String.format("Catalog for '%s' is not supported yet.", dialect));
}
}