public static void upgradeTo4_5_0()

in phoenix-core-client/src/main/java/org/apache/phoenix/util/UpgradeUtil.java [952:1181]


    public static void upgradeTo4_5_0(PhoenixConnection oldMetaConnection) throws SQLException {
        PhoenixConnection metaConnection = null;
        try {
            // Need to use own connection with max time stamp to be able to read all data from SYSTEM.CATALOG 
            metaConnection = new PhoenixConnection(oldMetaConnection, HConstants.LATEST_TIMESTAMP);
            LOGGER.info("Upgrading metadata to support adding columns to tables with views");
            String getBaseTableAndViews = "SELECT "
                    + COLUMN_FAMILY + " AS BASE_PHYSICAL_TABLE, "
                    + TENANT_ID + ", "
                    + TABLE_SCHEM + " AS VIEW_SCHEMA, "
                    + TABLE_NAME + " AS VIEW_NAME "
                    + "FROM " + SYSTEM_CATALOG_NAME 
                    + " WHERE " + COLUMN_FAMILY + " IS NOT NULL " // column_family column points to the physical table name.
                    + " AND " + COLUMN_NAME + " IS NULL "
                    + " AND " + LINK_TYPE + " = ? ";
            // Build a map of base table name -> list of views on the table. 
            Map<String, List<ViewKey>> parentTableViewsMap = new HashMap<>();
            try (PreparedStatement stmt = metaConnection.prepareStatement(getBaseTableAndViews)) {
                // Get back view rows that have links back to the base physical table. This takes care
                // of cases when we have a hierarchy of views too.
                stmt.setByte(1, LinkType.PHYSICAL_TABLE.getSerializedValue());
                try (ResultSet rs = stmt.executeQuery()) {
                    while (rs.next()) {
                        // this is actually SCHEMANAME.TABLENAME
                        String parentTable = rs.getString("BASE_PHYSICAL_TABLE");
                        String tenantId = rs.getString(TENANT_ID);
                        String viewSchema = rs.getString("VIEW_SCHEMA");
                        String viewName = rs.getString("VIEW_NAME");
                        List<ViewKey> viewKeysList = parentTableViewsMap.get(parentTable);
                        if (viewKeysList == null) {
                            viewKeysList = new ArrayList<>();
                            parentTableViewsMap.put(parentTable, viewKeysList);
                        }
                        viewKeysList.add(new ViewKey(tenantId, viewSchema, viewName));
                    }
                }
            }
            boolean clearCache = false;
            for (Entry<String, List<ViewKey>> entry : parentTableViewsMap.entrySet()) {
                // Fetch column information for the base physical table
                String physicalTable = entry.getKey();
                String baseTableSchemaName = SchemaUtil.getSchemaNameFromFullName(physicalTable).equals(StringUtil.EMPTY_STRING) ? null : SchemaUtil.getSchemaNameFromFullName(physicalTable);
                String baseTableName = SchemaUtil.getTableNameFromFullName(physicalTable);
                List<ColumnDetails> basePhysicalTableColumns = new ArrayList<>(); 

                // Columns fetched in order of ordinal position
                String fetchColumnInfoForBasePhysicalTable = "SELECT " +
                        COLUMN_NAME + "," +
                        COLUMN_FAMILY + "," +
                        DATA_TYPE + "," +
                        COLUMN_SIZE + "," +
                        DECIMAL_DIGITS + "," +
                        ORDINAL_POSITION + "," +
                        SORT_ORDER + "," +
                        ARRAY_SIZE + " " +
                        "FROM SYSTEM.CATALOG " +
                        "WHERE " +
                        "TABLE_SCHEM %s " +
                        "AND TABLE_NAME = ? " +
                        "AND COLUMN_NAME IS NOT NULL " +
                        "AND LINK_TYPE IS NULL " +
                        "ORDER BY " + 
                        ORDINAL_POSITION;

                PreparedStatement stmt = null;
                if (baseTableSchemaName == null) {
                    fetchColumnInfoForBasePhysicalTable =
                            String.format(fetchColumnInfoForBasePhysicalTable, "IS NULL ");
                    stmt = metaConnection.prepareStatement(fetchColumnInfoForBasePhysicalTable);
                    stmt.setString(1, baseTableName);
                } else {
                    fetchColumnInfoForBasePhysicalTable =
                            String.format(fetchColumnInfoForBasePhysicalTable, " = ? ");
                    stmt = metaConnection.prepareStatement(fetchColumnInfoForBasePhysicalTable);
                    stmt.setString(1, baseTableSchemaName);
                    stmt.setString(2, baseTableName);
                }

                try (ResultSet rs = stmt.executeQuery()) {
                    while (rs.next()) {
                        basePhysicalTableColumns.add(new ColumnDetails(rs.getString(COLUMN_FAMILY), rs
                                .getString(COLUMN_NAME), rs.getInt(ORDINAL_POSITION), rs
                                .getInt(DATA_TYPE), rs.getInt(COLUMN_SIZE), rs.getInt(DECIMAL_DIGITS),
                                rs.getInt(SORT_ORDER), rs.getInt(ARRAY_SIZE)));
                    }
                }

                // Fetch column information for all the views on the base physical table ordered by ordinal position.
                List<ViewKey> viewKeys = entry.getValue();
                StringBuilder sb = new StringBuilder();
                sb.append("SELECT " + 
                        TENANT_ID + "," +
                        TABLE_SCHEM + "," +
                        TABLE_NAME + "," +
                        COLUMN_NAME + "," +
                        COLUMN_FAMILY + "," +
                        DATA_TYPE + "," +
                        COLUMN_SIZE + "," +
                        DECIMAL_DIGITS + "," +
                        ORDINAL_POSITION + "," +
                        SORT_ORDER + "," +
                        ARRAY_SIZE + " " + 
                        "FROM SYSTEM.CATALOG " +
                        "WHERE " +
                        COLUMN_NAME + " IS NOT NULL " +
                        "AND " +
                        ORDINAL_POSITION + " <= ? " + // fetch only those columns that would impact setting of base column count
                        "AND " +
                        "(" + TENANT_ID+ ", " + TABLE_SCHEM + ", " + TABLE_NAME + ") IN (");

                int numViews = viewKeys.size();
                for (int i = 0; i < numViews; i++) {
                    sb.append(" (?, ?, ?) ");
                    if (i < numViews - 1) {
                        sb.append(", ");
                    }
                }
                sb.append(" ) ");
                sb.append(" GROUP BY " +
                        TENANT_ID + "," +
                        TABLE_SCHEM + "," +
                        TABLE_NAME + "," +
                        COLUMN_NAME + "," +
                        COLUMN_FAMILY + "," +
                        DATA_TYPE + "," +
                        COLUMN_SIZE + "," +
                        DECIMAL_DIGITS + "," +
                        ORDINAL_POSITION + "," +
                        SORT_ORDER + "," +
                        ARRAY_SIZE + " " + 
                        "ORDER BY " + 
                        TENANT_ID + "," + TABLE_SCHEM + ", " + TABLE_NAME + ", " + ORDINAL_POSITION);
                String fetchViewColumnsSql = sb.toString();
                stmt = metaConnection.prepareStatement(fetchViewColumnsSql);
                int numColsInBaseTable = basePhysicalTableColumns.size();
                stmt.setInt(1, numColsInBaseTable);
                int paramIndex = 1;
                stmt.setInt(paramIndex++, numColsInBaseTable);
                for (ViewKey view : viewKeys) {
                    stmt.setString(paramIndex++, view.tenantId);
                    stmt.setString(paramIndex++, view.schema);
                    stmt.setString(paramIndex++, view.name);
                }
                String currentTenantId = null;
                String currentViewSchema = null;
                String currentViewName = null;
                try (ResultSet rs = stmt.executeQuery()) {
                    int numBaseTableColsMatched = 0;
                    boolean ignore = false;
                    boolean baseColumnCountUpserted = false;
                    while (rs.next()) {
                        String viewTenantId = rs.getString(TENANT_ID);
                        String viewSchema = rs.getString(TABLE_SCHEM);
                        String viewName = rs.getString(TABLE_NAME);
                        if (!(Objects.equal(viewTenantId, currentTenantId) && Objects.equal(viewSchema, currentViewSchema) && Objects.equal(viewName, currentViewName))) {
                            // We are about to iterate through columns of a different view. Check whether base column count was upserted.
                            // If it wasn't then it is likely the case that a column inherited from the base table was dropped from view.
                            if (currentViewName != null && !baseColumnCountUpserted && numBaseTableColsMatched < numColsInBaseTable) {
                                upsertBaseColumnCountInHeaderRow(metaConnection, currentTenantId, currentViewSchema, currentViewName, DIVERGED_VIEW_BASE_COLUMN_COUNT);
                                clearCache = true;
                            }
                            // reset the values as we are now going to iterate over columns of a new view.
                            numBaseTableColsMatched = 0;
                            currentTenantId = viewTenantId;
                            currentViewSchema = viewSchema;
                            currentViewName = viewName;
                            ignore = false;
                            baseColumnCountUpserted = false;
                        }
                        if (!ignore) {
                            /*
                             * Iterate over all the columns of the base physical table and the columns of the view. Compare the
                             * two till one of the following happens: 
                             * 
                             * 1) We run into a view column which is different from column in the base physical table.
                             * This means that the view has diverged from the base physical table. In such a case
                             * we will set a special value for the base column count. That special value will also be used
                             * on the server side to filter out the diverged view so that meta-data changes on the base 
                             * physical table are not propagated to it.
                             * 
                             * 2) Every physical table column is present in the view. In that case we set the base column count
                             * as the number of columns in the base physical table. At that point we ignore rest of the columns
                             * of the view.
                             * 
                             */
                            ColumnDetails baseTableColumn = basePhysicalTableColumns.get(numBaseTableColsMatched);
                            String columName = rs.getString(COLUMN_NAME);
                            String columnFamily = rs.getString(COLUMN_FAMILY);
                            int ordinalPos = rs.getInt(ORDINAL_POSITION);
                            int dataType = rs.getInt(DATA_TYPE);
                            int columnSize = rs.getInt(COLUMN_SIZE);
                            int decimalDigits = rs.getInt(DECIMAL_DIGITS);
                            int sortOrder = rs.getInt(SORT_ORDER);
                            int arraySize = rs.getInt(ARRAY_SIZE);
                            ColumnDetails viewColumn = new ColumnDetails(columnFamily, columName, ordinalPos, dataType, columnSize, decimalDigits, sortOrder, arraySize);
                            if (baseTableColumn.equals(viewColumn)) {
                                numBaseTableColsMatched++;
                                if (numBaseTableColsMatched == numColsInBaseTable) {
                                    upsertBaseColumnCountInHeaderRow(metaConnection, viewTenantId, viewSchema, viewName, numColsInBaseTable);
                                    // No need to ignore the rest of the columns of the view here since the
                                    // query retrieved only those columns that had ordinal position <= numColsInBaseTable
                                    baseColumnCountUpserted = true;
                                    clearCache = true;
                                }
                            } else {
                                // special value to denote that the view has diverged from the base physical table.
                                upsertBaseColumnCountInHeaderRow(metaConnection, viewTenantId, viewSchema, viewName, DIVERGED_VIEW_BASE_COLUMN_COUNT);
                                baseColumnCountUpserted = true;
                                clearCache = true;
                                // ignore rest of the rows for the view.
                                ignore = true;
                            }
                        }
                    }
                }
                // set base column count for the header row of the base table too. We use this information
                // to figure out whether the upgrade is in progress or hasn't started.
                upsertBaseColumnCountInHeaderRow(metaConnection, null, baseTableSchemaName, baseTableName, BASE_TABLE_BASE_COLUMN_COUNT);
                metaConnection.commit();
            }
            // clear metadata cache on region servers to force loading of the latest metadata
            if (clearCache) {
                metaConnection.getQueryServices().clearCache();
            }
        } finally {
            if (metaConnection != null) {
                metaConnection.close();
            }
        }
    }