in flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java [143:211]
public String buildCreateTableDDL(TableSchema schema) {
StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
sb.append(identifier(schema.getDatabase()))
.append(".")
.append(identifier(schema.getTable()))
.append("(");
Map<String, FieldSchema> fields = schema.getFields();
List<String> keys = schema.getKeys();
//append keys
for(String key : keys){
if(!fields.containsKey(key)){
throw new CreateTableException("key " + key + " not found in column list");
}
FieldSchema field = fields.get(key);
buildColumn(sb, field, true);
}
//append values
for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
if(keys.contains(entry.getKey())){
continue;
}
FieldSchema field = entry.getValue();
buildColumn(sb, field, false);
}
sb = sb.deleteCharAt(sb.length() -1);
sb.append(" ) ");
//append uniq model
if(DataModel.UNIQUE.equals(schema.getModel())){
sb.append(schema.getModel().name())
.append(" KEY(")
.append(String.join(",", identifier(schema.getKeys())))
.append(")");
}
//append table comment
if(!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())){
sb.append(" COMMENT '")
.append(schema.getTableComment())
.append("' ");
}
//append distribute key
sb.append(" DISTRIBUTED BY HASH(")
.append(String.join(",", identifier(schema.getDistributeKeys())))
.append(") BUCKETS AUTO ");
//append properties
int index = 0;
for (Map.Entry<String, String> entry : schema.getProperties().entrySet()) {
if (index == 0) {
sb.append(" PROPERTIES (");
}
if (index > 0) {
sb.append(",");
}
sb.append(quoteProperties(entry.getKey()))
.append("=")
.append(quoteProperties(entry.getValue()));
index++;
if (index == schema.getProperties().size()) {
sb.append(")");
}
}
return sb.toString();
}