in flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalog.java [181:207]
public List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException {
Preconditions.checkState(
StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
if (!databaseExists(databaseName)) {
throw new DatabaseNotExistException(getName(), databaseName);
}
final String url = getDatabaseUrl(databaseName);
try (Connection conn = DriverManager.getConnection(url, connectionProperties)) {
// get all schemas
List<String> schemas;
try (PreparedStatement ps =
conn.prepareStatement("SELECT schema_name FROM information_schema.schemata;")) {
schemas =
extractColumnValuesByStatement(
ps, 1, pgSchema -> !getBuiltinSchemas().contains(pgSchema));
}
// get all tables
return getPureTables(conn, schemas);
} catch (Exception e) {
throw new CatalogException(
String.format("Failed to list tables for database %s", databaseName), e);
}
}