public static String generateCreateTableDDL()

in flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java [116:221]


    public static String generateCreateTableDDL(TableSchema schema) {
        StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
        sb.append(identifier(schema.getDatabase()))
                .append(".")
                .append(identifier(schema.getTable()))
                .append("(");

        Map<String, FieldSchema> fields = schema.getFields();
        List<String> keys = schema.getKeys();
        // append keys
        for (String key : keys) {
            if (!fields.containsKey(key)) {
                throw new CreateTableException("key " + key + " not found in column list");
            }
            FieldSchema field = fields.get(key);
            buildColumn(sb, field, true, false);
        }

        // append partition column, auto partition column must be in keys
        if (schema.getPartitionInfo() != null) {
            String partitionCol = schema.getPartitionInfo().f0;
            FieldSchema field = fields.get(partitionCol);
            buildColumn(sb, field, true, true);
        }

        // append values
        for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
            // skip key column
            if (keys.contains(entry.getKey())) {
                continue;
            }
            // skip partition column
            if (schema.getPartitionInfo() != null
                    && entry.getKey().equals(schema.getPartitionInfo().f0)) {
                continue;
            }
            FieldSchema field = entry.getValue();
            buildColumn(sb, field, false, false);
        }
        sb = sb.deleteCharAt(sb.length() - 1);
        sb.append(" ) ");
        // append uniq model
        if (DataModel.UNIQUE.equals(schema.getModel())) {
            sb.append(schema.getModel().name())
                    .append(" KEY(")
                    .append(String.join(",", identifier(schema.getKeys())));

            if (schema.getPartitionInfo() != null) {
                sb.append(",").append(identifier(schema.getPartitionInfo().f0));
            }

            sb.append(")");
        }

        // append table comment
        if (!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())) {
            sb.append(" COMMENT '").append(quoteComment(schema.getTableComment())).append("' ");
        }

        // append partition info if exists
        if (schema.getPartitionInfo() != null) {
            sb.append(" AUTO PARTITION BY RANGE ")
                    .append(
                            String.format(
                                    "(date_trunc(`%s`, '%s'))",
                                    schema.getPartitionInfo().f0, schema.getPartitionInfo().f1))
                    .append("()");
        }

        // append distribute key
        sb.append(" DISTRIBUTED BY HASH(")
                .append(String.join(",", identifier(schema.getDistributeKeys())))
                .append(")");

        Map<String, String> properties = schema.getProperties();
        if (schema.getTableBuckets() != null) {

            int bucketsNum = schema.getTableBuckets();
            if (bucketsNum <= 0) {
                throw new CreateTableException("The number of buckets must be positive.");
            }
            sb.append(" BUCKETS ").append(bucketsNum);
        } else {
            sb.append(" BUCKETS AUTO ");
        }

        // append properties
        int index = 0;
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            if (index == 0) {
                sb.append(" PROPERTIES (");
            }
            if (index > 0) {
                sb.append(",");
            }
            sb.append(quoteProperties(entry.getKey()))
                    .append("=")
                    .append(quoteProperties(entry.getValue()));
            index++;

            if (index == (schema.getProperties().size())) {
                sb.append(")");
            }
        }
        return sb.toString();
    }