in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java [241:293]
public CatalogBaseTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
if (!tableExists(tablePath)) {
throw new TableNotExistException(getName(), tablePath);
}
String databaseName = tablePath.getDatabaseName();
String dbUrl = baseUrl + databaseName;
try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
DatabaseMetaData metaData = conn.getMetaData();
Optional<UniqueConstraint> primaryKey =
getPrimaryKey(
metaData,
databaseName,
getSchemaName(tablePath),
getTableName(tablePath));
PreparedStatement ps =
conn.prepareStatement(
String.format("SELECT * FROM %s;", getSchemaTableName(tablePath)));
ResultSetMetaData resultSetMetaData = ps.getMetaData();
String[] columnNames = new String[resultSetMetaData.getColumnCount()];
DataType[] types = new DataType[resultSetMetaData.getColumnCount()];
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
columnNames[i - 1] = resultSetMetaData.getColumnName(i);
types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) {
types[i - 1] = types[i - 1].notNull();
}
}
Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);
primaryKey.ifPresent(
pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
Schema tableSchema = schemaBuilder.build();
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR.key(), IDENTIFIER);
props.put(URL.key(), dbUrl);
props.put(USERNAME.key(), username);
props.put(PASSWORD.key(), pwd);
props.put(TABLE_NAME.key(), getSchemaTableName(tablePath));
return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);
} catch (Exception e) {
throw new CatalogException(
String.format("Failed getting table %s", tablePath.getFullName()), e);
}
}