in phoenix-core-client/src/main/java/org/apache/phoenix/util/UpgradeUtil.java [952:1181]
public static void upgradeTo4_5_0(PhoenixConnection oldMetaConnection) throws SQLException {
PhoenixConnection metaConnection = null;
try {
// Need to use own connection with max time stamp to be able to read all data from SYSTEM.CATALOG
metaConnection = new PhoenixConnection(oldMetaConnection, HConstants.LATEST_TIMESTAMP);
LOGGER.info("Upgrading metadata to support adding columns to tables with views");
String getBaseTableAndViews = "SELECT "
+ COLUMN_FAMILY + " AS BASE_PHYSICAL_TABLE, "
+ TENANT_ID + ", "
+ TABLE_SCHEM + " AS VIEW_SCHEMA, "
+ TABLE_NAME + " AS VIEW_NAME "
+ "FROM " + SYSTEM_CATALOG_NAME
+ " WHERE " + COLUMN_FAMILY + " IS NOT NULL " // column_family column points to the physical table name.
+ " AND " + COLUMN_NAME + " IS NULL "
+ " AND " + LINK_TYPE + " = ? ";
// Build a map of base table name -> list of views on the table.
Map<String, List<ViewKey>> parentTableViewsMap = new HashMap<>();
try (PreparedStatement stmt = metaConnection.prepareStatement(getBaseTableAndViews)) {
// Get back view rows that have links back to the base physical table. This takes care
// of cases when we have a hierarchy of views too.
stmt.setByte(1, LinkType.PHYSICAL_TABLE.getSerializedValue());
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
// this is actually SCHEMANAME.TABLENAME
String parentTable = rs.getString("BASE_PHYSICAL_TABLE");
String tenantId = rs.getString(TENANT_ID);
String viewSchema = rs.getString("VIEW_SCHEMA");
String viewName = rs.getString("VIEW_NAME");
List<ViewKey> viewKeysList = parentTableViewsMap.get(parentTable);
if (viewKeysList == null) {
viewKeysList = new ArrayList<>();
parentTableViewsMap.put(parentTable, viewKeysList);
}
viewKeysList.add(new ViewKey(tenantId, viewSchema, viewName));
}
}
}
boolean clearCache = false;
for (Entry<String, List<ViewKey>> entry : parentTableViewsMap.entrySet()) {
// Fetch column information for the base physical table
String physicalTable = entry.getKey();
String baseTableSchemaName = SchemaUtil.getSchemaNameFromFullName(physicalTable).equals(StringUtil.EMPTY_STRING) ? null : SchemaUtil.getSchemaNameFromFullName(physicalTable);
String baseTableName = SchemaUtil.getTableNameFromFullName(physicalTable);
List<ColumnDetails> basePhysicalTableColumns = new ArrayList<>();
// Columns fetched in order of ordinal position
String fetchColumnInfoForBasePhysicalTable = "SELECT " +
COLUMN_NAME + "," +
COLUMN_FAMILY + "," +
DATA_TYPE + "," +
COLUMN_SIZE + "," +
DECIMAL_DIGITS + "," +
ORDINAL_POSITION + "," +
SORT_ORDER + "," +
ARRAY_SIZE + " " +
"FROM SYSTEM.CATALOG " +
"WHERE " +
"TABLE_SCHEM %s " +
"AND TABLE_NAME = ? " +
"AND COLUMN_NAME IS NOT NULL " +
"AND LINK_TYPE IS NULL " +
"ORDER BY " +
ORDINAL_POSITION;
PreparedStatement stmt = null;
if (baseTableSchemaName == null) {
fetchColumnInfoForBasePhysicalTable =
String.format(fetchColumnInfoForBasePhysicalTable, "IS NULL ");
stmt = metaConnection.prepareStatement(fetchColumnInfoForBasePhysicalTable);
stmt.setString(1, baseTableName);
} else {
fetchColumnInfoForBasePhysicalTable =
String.format(fetchColumnInfoForBasePhysicalTable, " = ? ");
stmt = metaConnection.prepareStatement(fetchColumnInfoForBasePhysicalTable);
stmt.setString(1, baseTableSchemaName);
stmt.setString(2, baseTableName);
}
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
basePhysicalTableColumns.add(new ColumnDetails(rs.getString(COLUMN_FAMILY), rs
.getString(COLUMN_NAME), rs.getInt(ORDINAL_POSITION), rs
.getInt(DATA_TYPE), rs.getInt(COLUMN_SIZE), rs.getInt(DECIMAL_DIGITS),
rs.getInt(SORT_ORDER), rs.getInt(ARRAY_SIZE)));
}
}
// Fetch column information for all the views on the base physical table ordered by ordinal position.
List<ViewKey> viewKeys = entry.getValue();
StringBuilder sb = new StringBuilder();
sb.append("SELECT " +
TENANT_ID + "," +
TABLE_SCHEM + "," +
TABLE_NAME + "," +
COLUMN_NAME + "," +
COLUMN_FAMILY + "," +
DATA_TYPE + "," +
COLUMN_SIZE + "," +
DECIMAL_DIGITS + "," +
ORDINAL_POSITION + "," +
SORT_ORDER + "," +
ARRAY_SIZE + " " +
"FROM SYSTEM.CATALOG " +
"WHERE " +
COLUMN_NAME + " IS NOT NULL " +
"AND " +
ORDINAL_POSITION + " <= ? " + // fetch only those columns that would impact setting of base column count
"AND " +
"(" + TENANT_ID+ ", " + TABLE_SCHEM + ", " + TABLE_NAME + ") IN (");
int numViews = viewKeys.size();
for (int i = 0; i < numViews; i++) {
sb.append(" (?, ?, ?) ");
if (i < numViews - 1) {
sb.append(", ");
}
}
sb.append(" ) ");
sb.append(" GROUP BY " +
TENANT_ID + "," +
TABLE_SCHEM + "," +
TABLE_NAME + "," +
COLUMN_NAME + "," +
COLUMN_FAMILY + "," +
DATA_TYPE + "," +
COLUMN_SIZE + "," +
DECIMAL_DIGITS + "," +
ORDINAL_POSITION + "," +
SORT_ORDER + "," +
ARRAY_SIZE + " " +
"ORDER BY " +
TENANT_ID + "," + TABLE_SCHEM + ", " + TABLE_NAME + ", " + ORDINAL_POSITION);
String fetchViewColumnsSql = sb.toString();
stmt = metaConnection.prepareStatement(fetchViewColumnsSql);
int numColsInBaseTable = basePhysicalTableColumns.size();
stmt.setInt(1, numColsInBaseTable);
int paramIndex = 1;
stmt.setInt(paramIndex++, numColsInBaseTable);
for (ViewKey view : viewKeys) {
stmt.setString(paramIndex++, view.tenantId);
stmt.setString(paramIndex++, view.schema);
stmt.setString(paramIndex++, view.name);
}
String currentTenantId = null;
String currentViewSchema = null;
String currentViewName = null;
try (ResultSet rs = stmt.executeQuery()) {
int numBaseTableColsMatched = 0;
boolean ignore = false;
boolean baseColumnCountUpserted = false;
while (rs.next()) {
String viewTenantId = rs.getString(TENANT_ID);
String viewSchema = rs.getString(TABLE_SCHEM);
String viewName = rs.getString(TABLE_NAME);
if (!(Objects.equal(viewTenantId, currentTenantId) && Objects.equal(viewSchema, currentViewSchema) && Objects.equal(viewName, currentViewName))) {
// We are about to iterate through columns of a different view. Check whether base column count was upserted.
// If it wasn't then it is likely the case that a column inherited from the base table was dropped from view.
if (currentViewName != null && !baseColumnCountUpserted && numBaseTableColsMatched < numColsInBaseTable) {
upsertBaseColumnCountInHeaderRow(metaConnection, currentTenantId, currentViewSchema, currentViewName, DIVERGED_VIEW_BASE_COLUMN_COUNT);
clearCache = true;
}
// reset the values as we are now going to iterate over columns of a new view.
numBaseTableColsMatched = 0;
currentTenantId = viewTenantId;
currentViewSchema = viewSchema;
currentViewName = viewName;
ignore = false;
baseColumnCountUpserted = false;
}
if (!ignore) {
/*
* Iterate over all the columns of the base physical table and the columns of the view. Compare the
* two till one of the following happens:
*
* 1) We run into a view column which is different from column in the base physical table.
* This means that the view has diverged from the base physical table. In such a case
* we will set a special value for the base column count. That special value will also be used
* on the server side to filter out the diverged view so that meta-data changes on the base
* physical table are not propagated to it.
*
* 2) Every physical table column is present in the view. In that case we set the base column count
* as the number of columns in the base physical table. At that point we ignore rest of the columns
* of the view.
*
*/
ColumnDetails baseTableColumn = basePhysicalTableColumns.get(numBaseTableColsMatched);
String columName = rs.getString(COLUMN_NAME);
String columnFamily = rs.getString(COLUMN_FAMILY);
int ordinalPos = rs.getInt(ORDINAL_POSITION);
int dataType = rs.getInt(DATA_TYPE);
int columnSize = rs.getInt(COLUMN_SIZE);
int decimalDigits = rs.getInt(DECIMAL_DIGITS);
int sortOrder = rs.getInt(SORT_ORDER);
int arraySize = rs.getInt(ARRAY_SIZE);
ColumnDetails viewColumn = new ColumnDetails(columnFamily, columName, ordinalPos, dataType, columnSize, decimalDigits, sortOrder, arraySize);
if (baseTableColumn.equals(viewColumn)) {
numBaseTableColsMatched++;
if (numBaseTableColsMatched == numColsInBaseTable) {
upsertBaseColumnCountInHeaderRow(metaConnection, viewTenantId, viewSchema, viewName, numColsInBaseTable);
// No need to ignore the rest of the columns of the view here since the
// query retrieved only those columns that had ordinal position <= numColsInBaseTable
baseColumnCountUpserted = true;
clearCache = true;
}
} else {
// special value to denote that the view has diverged from the base physical table.
upsertBaseColumnCountInHeaderRow(metaConnection, viewTenantId, viewSchema, viewName, DIVERGED_VIEW_BASE_COLUMN_COUNT);
baseColumnCountUpserted = true;
clearCache = true;
// ignore rest of the rows for the view.
ignore = true;
}
}
}
}
// set base column count for the header row of the base table too. We use this information
// to figure out whether the upgrade is in progress or hasn't started.
upsertBaseColumnCountInHeaderRow(metaConnection, null, baseTableSchemaName, baseTableName, BASE_TABLE_BASE_COLUMN_COUNT);
metaConnection.commit();
}
// clear metadata cache on region servers to force loading of the latest metadata
if (clearCache) {
metaConnection.getQueryServices().clearCache();
}
} finally {
if (metaConnection != null) {
metaConnection.close();
}
}
}