in flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java [116:221]
public static String generateCreateTableDDL(TableSchema schema) {
StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
sb.append(identifier(schema.getDatabase()))
.append(".")
.append(identifier(schema.getTable()))
.append("(");
Map<String, FieldSchema> fields = schema.getFields();
List<String> keys = schema.getKeys();
// append keys
for (String key : keys) {
if (!fields.containsKey(key)) {
throw new CreateTableException("key " + key + " not found in column list");
}
FieldSchema field = fields.get(key);
buildColumn(sb, field, true, false);
}
// append partition column, auto partition column must be in keys
if (schema.getPartitionInfo() != null) {
String partitionCol = schema.getPartitionInfo().f0;
FieldSchema field = fields.get(partitionCol);
buildColumn(sb, field, true, true);
}
// append values
for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
// skip key column
if (keys.contains(entry.getKey())) {
continue;
}
// skip partition column
if (schema.getPartitionInfo() != null
&& entry.getKey().equals(schema.getPartitionInfo().f0)) {
continue;
}
FieldSchema field = entry.getValue();
buildColumn(sb, field, false, false);
}
sb = sb.deleteCharAt(sb.length() - 1);
sb.append(" ) ");
// append uniq model
if (DataModel.UNIQUE.equals(schema.getModel())) {
sb.append(schema.getModel().name())
.append(" KEY(")
.append(String.join(",", identifier(schema.getKeys())));
if (schema.getPartitionInfo() != null) {
sb.append(",").append(identifier(schema.getPartitionInfo().f0));
}
sb.append(")");
}
// append table comment
if (!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())) {
sb.append(" COMMENT '").append(quoteComment(schema.getTableComment())).append("' ");
}
// append partition info if exists
if (schema.getPartitionInfo() != null) {
sb.append(" AUTO PARTITION BY RANGE ")
.append(
String.format(
"(date_trunc(`%s`, '%s'))",
schema.getPartitionInfo().f0, schema.getPartitionInfo().f1))
.append("()");
}
// append distribute key
sb.append(" DISTRIBUTED BY HASH(")
.append(String.join(",", identifier(schema.getDistributeKeys())))
.append(")");
Map<String, String> properties = schema.getProperties();
if (schema.getTableBuckets() != null) {
int bucketsNum = schema.getTableBuckets();
if (bucketsNum <= 0) {
throw new CreateTableException("The number of buckets must be positive.");
}
sb.append(" BUCKETS ").append(bucketsNum);
} else {
sb.append(" BUCKETS AUTO ");
}
// append properties
int index = 0;
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (index == 0) {
sb.append(" PROPERTIES (");
}
if (index > 0) {
sb.append(",");
}
sb.append(quoteProperties(entry.getKey()))
.append("=")
.append(quoteProperties(entry.getValue()));
index++;
if (index == (schema.getProperties().size())) {
sb.append(")");
}
}
return sb.toString();
}