in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java [267:316]
public CatalogBaseTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
if (!tableExists(tablePath)) {
throw new TableNotExistException(getName(), tablePath);
}
String databaseName = tablePath.getDatabaseName();
try (Connection conn =
DriverManager.getConnection(getDatabaseUrl(databaseName), connectionProperties)) {
DatabaseMetaData metaData = conn.getMetaData();
Optional<UniqueConstraint> primaryKey =
getPrimaryKey(
metaData,
databaseName,
getSchemaName(tablePath),
getTableName(tablePath));
PreparedStatement ps =
conn.prepareStatement(
String.format("SELECT * FROM %s;", getSchemaTableName(tablePath)));
ResultSetMetaData resultSetMetaData = ps.getMetaData();
String[] columnNames = new String[resultSetMetaData.getColumnCount()];
DataType[] types = new DataType[resultSetMetaData.getColumnCount()];
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
columnNames[i - 1] = resultSetMetaData.getColumnName(i);
types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) {
types[i - 1] = types[i - 1].notNull();
}
}
Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);
primaryKey.ifPresent(
pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
Schema tableSchema = schemaBuilder.build();
return CatalogTable.newBuilder()
.schema(tableSchema)
.options(getOptions(tablePath))
.build();
} catch (Exception e) {
throw new CatalogException(
String.format("Failed getting table %s", tablePath.getFullName()), e);
}
}