in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java [39:73]
public SourceSchema(
DatabaseMetaData metaData, String databaseName, String schemaName, String tableName, String tableComment)
throws Exception {
this.databaseName = databaseName;
this.tableName = tableName;
this.tableComment = tableComment;
fields = new LinkedHashMap<>();
try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
String comment = rs.getString("REMARKS");
String fieldType = rs.getString("TYPE_NAME");
Integer precision = rs.getInt("COLUMN_SIZE");
if (rs.wasNull()) {
precision = null;
}
Integer scale = rs.getInt("DECIMAL_DIGITS");
if (rs.wasNull()) {
scale = null;
}
String dorisTypeStr = convertToDorisType(fieldType, precision, scale);
fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment));
}
}
primaryKeys = new ArrayList<>();
try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName, tableName)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
primaryKeys.add(fieldName);
}
}
}