public String getTableDDLManually()

in holo-shipper/src/main/java/com/alibaba/hologres/shipper/holo/HoloTable.java [72:235]


    public String getTableDDLManually() {
        LOGGER.info("Starting fetching DDL manually for table " + tableName);
        //create table
        String tableDDL = null;
        try{
            String findOidSQl = String.format("SELECT c.oid,\n" +
                    "  n.nspname,\n" +
                    "  c.relname\n" +
                    "FROM pg_catalog.pg_class c\n" +
                    "     LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace\n" +
                    "WHERE c.relname = '%s'\n" +
                    "  AND n.nspname = '%s'\n" +
                    "ORDER BY 2, 3", pureTableName, schemaName);
            int oid = client.sql(conn -> {
                int ret;
                try (Statement stmt = conn.createStatement()) {
                    try (ResultSet rs = stmt.executeQuery(findOidSQl)) {
                        rs.next();
                        ret = rs.getInt("oid");
                    }
                }
                return ret;
            }).get();
            String findParentSql = String.format("SELECT inhparent::pg_catalog.regclass,\n" +
                    "  pg_catalog.pg_get_expr(c.relpartbound, inhrelid)\n" +
                    "FROM pg_catalog.pg_class c JOIN pg_catalog.pg_inherits i ON c.oid = inhrelid\n" +
                    "WHERE c.oid = '%d' AND c.relispartition", oid);
            String parentInfo = client.sql(conn -> {
                String ret = null;
                try (Statement stmt = conn.createStatement()) {
                    try (ResultSet rs = stmt.executeQuery(findParentSql)) {
                        if(rs.next()) {
                            String parent = rs.getString("inhparent");
                            String value = rs.getString("pg_get_expr");
                            ret = "PARTITION OF " + parent + ' ' + value;
                        }
                    }
                }
                return ret;
            }).get();
            String findColumnsSql = String.format("select column_name, data_type, is_nullable, character_maximum_length from information_schema.columns " +
                    "where table_schema = '%s' and table_name = '%s'", schemaName, pureTableName);
            String columnInfo = client.sql(conn -> {
                String ret = "(\n";
                try (Statement stmt = conn.createStatement()) {
                    try (ResultSet rs = stmt.executeQuery(findColumnsSql)) {
                        while(rs.next()) {
                            ret = ret + "    " + rs.getString("column_name") + " " + rs.getString("data_type");
                            int maxLength = rs.getInt("character_maximum_length");
                            if(maxLength > 0)
                                ret += String.format("(%d)", maxLength);
                            if(rs.getString("is_nullable").equals("NO"))
                                ret += " NOT NULL";
                            ret += ",\n";
                        }
                    }
                    ret = ret.substring(0, ret.length()-2);
                    String findPrimaryKeySql = String.format("SELECT conname, pg_catalog.pg_get_constraintdef(r.oid, true) as condef\n" +
                            "FROM pg_catalog.pg_constraint r\n" +
                            "WHERE r.conrelid = '%d' AND r.contype = 'p' ORDER BY 1", oid);
                    try (ResultSet rs = stmt.executeQuery(findPrimaryKeySql)) {
                        if(rs.next()) {
                            String primaryKey = rs.getString("condef");
                            if (primaryKey != null)
                                ret = ret + "\n    ," + primaryKey;
                        }
                    }
                }
                ret += "\n)";
                return ret;
            }).get();
            String findPartitionSql = String.format("SELECT pg_catalog.pg_get_partkeydef('%d'::pg_catalog.oid)", oid);
            String partitionInfo = client.sql(conn -> {
                String ret = null;
                try (Statement stmt = conn.createStatement()) {
                    try (ResultSet rs = stmt.executeQuery(findPartitionSql)) {
                        if(rs.next() && rs.getString("pg_get_partkeydef") != null) {
                            ret = "\nPARTITION BY " + rs.getString("pg_get_partkeydef");
                        }
                    }
                }
                return ret;
            }).get();
            String createTable = String.format("CREATE TABLE %s.%s ", schemaName, pureTableName);
            if(parentInfo != null)
                createTable = createTable + parentInfo + ";\n";
            else {
                createTable += columnInfo;
                if(partitionInfo != null)
                    createTable = createTable + partitionInfo;
                createTable += ";\n";
            }
            //table properties
            String findPropertiesSQl1 = String.format("select property_key, property_value from hologres.holo_table_properties where table_namespace = '%s' and table_name = '%s'", schemaName, pureTableName);
            String findPropertiesSQl2 = String.format("select property_key, property_value from hologres.hg_table_properties where table_namespace = '%s' and table_name = '%s'", schemaName, pureTableName);
            String tableProperties = client.sql(conn -> {
                String ret = "";
                try (Statement stmt = conn.createStatement()) {
                    ResultSet rs = null;
                    try{
                        rs = stmt.executeQuery(findPropertiesSQl1);
                    } catch (SQLException e) {
                        rs = stmt.executeQuery(findPropertiesSQl2);
                    }
                    List<String> possibleProperties = Arrays.asList("orientation", "clustering_key", "segment_key", "bitmap_columns", "dictionary_encoding_columns", "distribution_key", "time_to_live_in_seconds", "storage_format");
                    while(rs.next()) {
                        String propertyKey = rs.getString("property_key");
                        if(possibleProperties.contains(propertyKey))
                            ret += String.format("CALL set_table_property('%s', '%s', '%s');\n", tableName, propertyKey, rs.getString("property_value"));
                    }
                    rs.close();
                }
                return ret;
            }).get();
            String findTableCommentSql = String.format("SELECT obj_description(%d)", oid);
            String tableComment = client.sql(conn -> {
                String comment = "NULL";
                try (Statement stmt = conn.createStatement()) {
                    try (ResultSet rs = stmt.executeQuery(findTableCommentSql)) {
                        if(rs.next() && rs.getString("obj_description") != null) {
                            comment = String.format("'%s'", rs.getString("obj_description"));
                        }
                    }
                }
                String ret = String.format("COMMENT ON TABLE %s IS %s;\n", tableName, comment);
                return ret;
            }).get();
            String findOwnerSql = String.format("SELECT tableowner FROM pg_catalog.pg_tables where schemaname = '%s' and tablename = '%s'", schemaName, pureTableName);
            String tableOwner = client.sql(conn -> {
                String owner= null;
                try (Statement stmt = conn.createStatement()) {
                    try (ResultSet rs = stmt.executeQuery(findOwnerSql)) {
                        rs.next();
                        owner = rs.getString("tableowner");
                    }
                }
                String ret = String.format("ALTER TABLE %s OWNER TO \"%s\";\n", tableName, owner);
                return ret;
            }).get();
            String findColCommentsSql = String.format("select column_name, col_description('%d', ordinal_position)\n" +
                    "from information_schema.columns\n" +
                    "where table_schema = '%s' and table_name = '%s'", oid, schemaName, pureTableName);
            String columnComments = client.sql(conn -> {
                String colComments = "";
                try (Statement stmt = conn.createStatement()) {
                    try (ResultSet rs = stmt.executeQuery(findColCommentsSql)) {
                        while(rs.next()) {
                            String comment = rs.getString("col_description");
                            if(comment != null)
                                colComments += String.format("COMMENT ON COLUMN %s.\"%s\" IS '%s';\n", tableName, rs.getString("column_name"), comment);
                        }
                    }
                }
                return colComments;
            }).get();
            if(parentInfo != null)
                tableDDL = "BEGIN;\n\n" + createTable + '\n' + tableComment + tableOwner + columnComments + "\nEND;\n";
            else
                tableDDL = "BEGIN;\n\n" + createTable + '\n' + tableProperties + '\n' + tableComment + tableOwner + columnComments + "\nEND;\n";
        } catch (Exception e) {
            LOGGER.error("Failed fetching DDL for table " + tableName, e);
        }
        return tableDDL;
    }