private Schema createTableSchema()

in flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java [250:278]


    private Schema createTableSchema(String databaseName, String tableName) {
        try (Connection conn = DriverManager.getConnection(connectionOptions.getJdbcUrl(),
                connectionOptions.getUsername(),
                connectionOptions.getPassword())) {
            PreparedStatement ps =
                    conn.prepareStatement(
                            String.format("SELECT COLUMN_NAME,DATA_TYPE,COLUMN_SIZE,DECIMAL_DIGITS FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA`= '%s' AND `TABLE_NAME`= '%s'", databaseName, tableName));

            List<String> columnNames = new ArrayList<>();
            List<DataType> columnTypes = new ArrayList<>();
            ResultSet resultSet = ps.executeQuery();
            while (resultSet.next()) {
                String columnName = resultSet.getString("COLUMN_NAME");
                String columnType = resultSet.getString("DATA_TYPE");
                long columnSize = resultSet.getLong("COLUMN_SIZE");
                long columnDigit = resultSet.getLong("DECIMAL_DIGITS");

                DataType flinkType = DorisTypeMapper.toFlinkType(columnName, columnType, (int) columnSize, (int) columnDigit);
                columnNames.add(columnName);
                columnTypes.add(flinkType);
            }
            Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, columnTypes);
            Schema tableSchema = schemaBuilder.build();
            return tableSchema;
        } catch (Exception e) {
            throw new CatalogException(
                    String.format("Failed getting catalog %s database %s table %s", getName(), databaseName, tableName), e);
        }
    }