public static String getCreateTableSql()

in src/main/java/com/uber/uberscriptquery/jdbc/JdbcUtils.java [30:176]


    public static String getCreateTableSql(List<String> columnNames, List<org.apache.spark.sql.types.DataType> columnTypes, String tableName, List<String> primaryKeys, List<String> indexColumns, List<String> textColumns) {
        if (columnNames.size() != columnTypes.size()) {
            throw new RuntimeException(String.format("Number of column names (%s) is different from number of column types (%s)", columnNames.size(), columnTypes.size()));
        }

        if (indexColumns == null) {
            indexColumns = new ArrayList<>();
        }

        if (textColumns == null) {
            textColumns = new ArrayList<>();
        }

        LOG.info(String.format("Getting sql to create table, column name: %s, column types: %s, table name: %s, primary keys: %s, index columns: %s, text columns: %s",
                StringUtils.join(columnNames, ','),
                StringUtils.join(columnTypes, ','),
                tableName,
                StringUtils.join(primaryKeys, ','),
                StringUtils.join(indexColumns, ','),
                StringUtils.join(textColumns, ',')));

        Set<String> primaryKeysLowerCase = new HashSet<>();
        for (String entry : primaryKeys) {
            primaryKeysLowerCase.add(entry.toLowerCase());
        }

        Set<String> indexColumnsLowerCase = new HashSet<>();
        for (String entry : indexColumns) {
            indexColumnsLowerCase.add(entry.toLowerCase());
        }

        Set<String> textColumnsLowerCase = new HashSet<>();
        for (String entry : textColumns) {
            textColumnsLowerCase.add(entry.toLowerCase());
        }

        StringBuilder sb = new StringBuilder();
        sb.append(String.format("CREATE TABLE IF NOT EXISTS %s (", tableName));

        List<org.apache.spark.sql.types.DataType> primaryKeyTypes = new ArrayList<>();
        List<String> matchedPrimaryKeys = new ArrayList<>();

        List<org.apache.spark.sql.types.DataType> indexColumnTypes = new ArrayList<>();
        List<String> matchedIndexColumns = new ArrayList<>();

        List<String> otherColumnNames = new ArrayList<>();
        List<org.apache.spark.sql.types.DataType> otherColumnTypes = new ArrayList<>();

        for (int i = 0; i < columnNames.size(); i++) {
            String columnName = columnNames.get(i);
            org.apache.spark.sql.types.DataType columnType = columnTypes.get(i);
            if (primaryKeysLowerCase.contains(columnName.toLowerCase())) {
                primaryKeyTypes.add(columnType);
                matchedPrimaryKeys.add(columnName);
            }
            if (indexColumnsLowerCase.contains(columnName.toLowerCase())) {
                indexColumnTypes.add(columnType);
                matchedIndexColumns.add(columnName);
            }
            if (!primaryKeysLowerCase.contains(columnName.toLowerCase())
                    && !indexColumnsLowerCase.contains(columnName.toLowerCase())) {
                otherColumnNames.add(columnName);
                otherColumnTypes.add(columnType);
            }
        }

        if (primaryKeyTypes.size() != primaryKeys.size()) {
            throw new RuntimeException(String.format("Invalid primary keys. There are %s in matched primary keys (%s), but %s in given primary keys (%s)",
                    primaryKeyTypes.size(),
                    StringUtils.join(matchedPrimaryKeys, ','),
                    primaryKeys.size(),
                    StringUtils.join(primaryKeys, ',')));
        }

        if (indexColumnTypes.size() != indexColumns.size()) {
            throw new RuntimeException(String.format("Invalid index fields. There are %s matched index columns (%s), but %s in given index columns (%s)",
                    indexColumnTypes.size(),
                    StringUtils.join(matchedIndexColumns, ','),
                    indexColumns.size(),
                    StringUtils.join(indexColumns, ',')));
        }

        // Order columns in sequence: Primary Keys, Index Columns (exclude primary keys), Other Columns
        List<String> reorderedColumnNames = new ArrayList<>();
        List<org.apache.spark.sql.types.DataType> reorderedColumnTypes = new ArrayList<>();


        reorderedColumnNames.addAll(matchedPrimaryKeys);
        reorderedColumnTypes.addAll(primaryKeyTypes);

        for (int i = 0; i < matchedIndexColumns.size(); i++) {
            String columnName = matchedIndexColumns.get(i);
            if (primaryKeysLowerCase.contains(columnName.toLowerCase())) {
                continue;
            }
            reorderedColumnNames.add(columnName);
            reorderedColumnTypes.add(indexColumnTypes.get(i));
        }

        reorderedColumnNames.addAll(otherColumnNames);
        reorderedColumnTypes.addAll(otherColumnTypes);

        if (reorderedColumnNames.size() != columnNames.size()) {
            throw new RuntimeException(String.format("Invalid columns after re-ordering: %s (%s) v.s. %s (%s)",
                    reorderedColumnNames.size(),
                    StringUtils.join(reorderedColumnNames, ','),
                    columnNames.size(),
                    StringUtils.join(columnNames, ',')));
        }


        sb.append(reorderedColumnNames.get(0));
        sb.append(" ");
        sb.append(getJdbcTypeString(
                reorderedColumnTypes.get(0),
                primaryKeysLowerCase.contains(reorderedColumnNames.get(0).toLowerCase()) || indexColumnsLowerCase.contains(reorderedColumnNames.get(0).toLowerCase()),
                textColumnsLowerCase.contains(reorderedColumnNames.get(0).toLowerCase())));

        for (int i = 1; i < reorderedColumnNames.size(); i++) {
            sb.append(", ");
            sb.append(reorderedColumnNames.get(i));
            sb.append(" ");
            sb.append(getJdbcTypeString(
                    reorderedColumnTypes.get(i),
                    primaryKeysLowerCase.contains(reorderedColumnNames.get(i).toLowerCase()) || indexColumnsLowerCase.contains(reorderedColumnNames.get(i).toLowerCase()),
                    textColumnsLowerCase.contains(reorderedColumnNames.get(i).toLowerCase())));
        }

        if (!primaryKeyTypes.isEmpty()) {
            sb.append(", PRIMARY KEY(");
            sb.append(primaryKeys.get(0));
            for (int i = 1; i < primaryKeyTypes.size(); i++) {
                sb.append(", ");
                sb.append(primaryKeys.get(i));
            }
            sb.append(")");
        }

        for (int i = 0; i < indexColumnTypes.size(); i++) {
            sb.append(String.format(", INDEX index_%s (%s)",
                    indexColumns.get(i),
                    indexColumns.get(i)));
        }

        sb.append(")");
        return sb.toString();
    }