in flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java [250:278]
private Schema createTableSchema(String databaseName, String tableName) {
try (Connection conn = DriverManager.getConnection(connectionOptions.getJdbcUrl(),
connectionOptions.getUsername(),
connectionOptions.getPassword())) {
PreparedStatement ps =
conn.prepareStatement(
String.format("SELECT COLUMN_NAME,DATA_TYPE,COLUMN_SIZE,DECIMAL_DIGITS FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA`= '%s' AND `TABLE_NAME`= '%s'", databaseName, tableName));
List<String> columnNames = new ArrayList<>();
List<DataType> columnTypes = new ArrayList<>();
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
String columnName = resultSet.getString("COLUMN_NAME");
String columnType = resultSet.getString("DATA_TYPE");
long columnSize = resultSet.getLong("COLUMN_SIZE");
long columnDigit = resultSet.getLong("DECIMAL_DIGITS");
DataType flinkType = DorisTypeMapper.toFlinkType(columnName, columnType, (int) columnSize, (int) columnDigit);
columnNames.add(columnName);
columnTypes.add(flinkType);
}
Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, columnTypes);
Schema tableSchema = schemaBuilder.build();
return tableSchema;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed getting catalog %s database %s table %s", getName(), databaseName, tableName), e);
}
}