in phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java [3453:3751]
private MetaDataMutationResult mutateColumn(
final List<Mutation> tableMetadata,
final ColumnMutator mutator, final int clientVersion,
final PTable parentTable, final PTable transformingNewTable, boolean isAddingOrDroppingColumns) throws IOException {
byte[][] rowKeyMetaData = new byte[5][];
MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] tableOrViewName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableOrViewName);
String fullTableName = SchemaUtil.getTableName(schemaName, tableOrViewName);
// server-side, except for indexing, we always expect the keyvalues to be standard KeyValues
PTableType expectedType = MetaDataUtil.getTableType(tableMetadata,
GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable());
List<byte[]> tableNamesToDelete = Lists.newArrayList();
List<SharedTableState> sharedTablesToDelete = Lists.newArrayList();
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
try {
Region region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(key, region);
if (result != null) {
return result;
}
List<RowLock> locks = Lists.newArrayList();
try {
List<PTable> childViews = Lists.newArrayList();
Optional<MetaDataMutationResult> mutationResult = validateIfMutationAllowedOnParent(
parentTable, tableMetadata,
expectedType, clientTimeStamp, tenantId, schemaName, tableOrViewName,
childViews, clientVersion);
// only if mutation is allowed, we should get Optional.empty() here
if (mutationResult.isPresent()) {
return mutationResult.get();
}
// We take a write row lock for tenantId, schemaName, tableOrViewName
acquireLock(region, key, locks, false);
// Invalidate the cache from all the regionservers.
List<InvalidateServerMetadataCacheRequest> requests = new ArrayList<>();
requests.add(new InvalidateServerMetadataCacheRequest(tenantId, schemaName,
tableOrViewName));
invalidateServerMetadataCache(requests);
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
List<ImmutableBytesPtr> invalidateList = new ArrayList<>();
invalidateList.add(cacheKey);
PTable table = getTableFromCache(cacheKey, clientTimeStamp, clientVersion);
if (failConcurrentMutateAddColumnOneTimeForTesting) {
failConcurrentMutateAddColumnOneTimeForTesting = false;
return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION,
EnvironmentEdgeManager.currentTimeMillis(), table);
}
if (LOGGER.isDebugEnabled()) {
if (table == null) {
LOGGER.debug("Table " + Bytes.toStringBinary(key)
+ " not found in cache. Will build through scan");
} else {
LOGGER.debug("Table " + Bytes.toStringBinary(key)
+ " found in cache with timestamp " + table.getTimeStamp()
+ " seqNum " + table.getSequenceNumber());
}
}
// Get client timeStamp from mutations
if (table == null && (table = buildTable(key, cacheKey, region,
HConstants.LATEST_TIMESTAMP, clientVersion)) == null) {
// if not found then call newerTableExists and add delete marker for timestamp
// found
table = buildDeletedTable(key, cacheKey, region, clientTimeStamp);
if (table != null) {
LOGGER.info("Found newer table deleted as of " + table.getTimeStamp()
+ " versus client timestamp of " + clientTimeStamp);
return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
EnvironmentEdgeManager.currentTimeMillis(), null);
}
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
EnvironmentEdgeManager.currentTimeMillis(), null);
}
// if this is a view or view index then we need to include columns and
// indexes derived from its ancestors
if (parentTable != null) {
Properties props = new Properties();
if (tenantId != null) {
props.setProperty(TENANT_ID_ATTRIB, Bytes.toString(tenantId));
}
if (clientTimeStamp != HConstants.LATEST_TIMESTAMP) {
props.setProperty("CurrentSCN", Long.toString(clientTimeStamp));
}
try (PhoenixConnection connection = getServerConnectionForMetaData(props,
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
table = ViewUtil.addDerivedColumnsAndIndexesFromParent(connection, table,
parentTable);
}
}
if (transformingNewTable !=null) {
table = PTableImpl.builderWithColumns(table, getColumnsToClone(table))
.setTransformingNewTable(transformingNewTable).build();
}
if (table.getTimeStamp() >= clientTimeStamp) {
LOGGER.info("Found newer table as of " + table.getTimeStamp()
+ " versus client timestamp of " + clientTimeStamp);
return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
EnvironmentEdgeManager.currentTimeMillis(), table);
} else if (isTableDeleted(table)) {
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
EnvironmentEdgeManager.currentTimeMillis(), null);
}
// lookup TABLE_SEQ_NUM in tableMetaData
long expectedSeqNum = MetaDataUtil.getSequenceNumber(tableMetadata) - 1;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("For table " + Bytes.toStringBinary(key) + " expecting seqNum "
+ expectedSeqNum + " and found seqNum " + table.getSequenceNumber()
+ " with " + table.getColumns().size() + " columns: "
+ table.getColumns());
}
if (expectedSeqNum != table.getSequenceNumber()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("For table " + Bytes.toStringBinary(key)
+ " returning CONCURRENT_TABLE_MUTATION due to unexpected seqNum");
}
return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION,
EnvironmentEdgeManager.currentTimeMillis(), table);
}
PTableType type = table.getType();
if (type == PTableType.INDEX) {
// Disallow mutation of an index table
return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
EnvironmentEdgeManager.currentTimeMillis(), null);
} else {
// We said to drop a table, but found a view or visa versa
if (type != expectedType) {
return new MetaDataProtocol.MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
EnvironmentEdgeManager.currentTimeMillis(), null);
}
}
if (!childViews.isEmpty()) {
// validate the add or drop column mutations
result = mutator.validateWithChildViews(table, childViews, tableMetadata,
schemaName, tableOrViewName);
if (result != null) {
return result;
}
}
getCoprocessorHost().preAlterTable(Bytes.toString(tenantId),
SchemaUtil.getTableName(schemaName, tableOrViewName),
TableName.valueOf(table.getPhysicalName().getBytes()),
getParentPhysicalTableName(table), table.getType());
result = mutator.validateAndAddMetadata(table, rowKeyMetaData, tableMetadata,
region, invalidateList, locks, clientTimeStamp, clientVersion,
((ExtendedCellBuilder) env.getCellBuilder()), isAddingOrDroppingColumns);
// if the update mutation caused tables to be deleted, the mutation code returned
// will be MutationCode.TABLE_ALREADY_EXISTS
if (result != null
&& result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
return result;
}
// drop any indexes on the base table that need the column that is going to be
// dropped
List<Pair<PTable, PColumn>> tableAndDroppedColumnPairs =
mutator.getTableAndDroppedColumnPairs();
Iterator<Pair<PTable, PColumn>> iterator = tableAndDroppedColumnPairs.iterator();
while (iterator.hasNext()) {
Pair<PTable, PColumn> pair = iterator.next();
// remove the current table and column being dropped from the list and drop any
// indexes that require the column being dropped while holding the row lock
if (table.equals(pair.getFirst())) {
iterator.remove();
result = dropIndexes(env, pair.getFirst(), invalidateList, locks,
clientTimeStamp, tableMetadata, pair.getSecond(),
tableNamesToDelete, sharedTablesToDelete, clientVersion);
if (result != null
&& result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
return result;
}
}
}
if (table.isChangeDetectionEnabled() || MetaDataUtil.getChangeDetectionEnabled(tableMetadata)) {
long startTime = EnvironmentEdgeManager.currentTimeMillis();
try {
exportSchema(tableMetadata, key, clientTimeStamp, clientVersion, table);
metricsSource.incrementAlterExportCount();
metricsSource.updateAlterExportTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
} catch (Exception e) {
LOGGER.error("Error writing to schema registry", e);
metricsSource.incrementAlterExportFailureCount();
metricsSource.updateAlterExportFailureTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
result = new MetaDataMutationResult(MutationCode.ERROR_WRITING_TO_SCHEMA_REGISTRY,
EnvironmentEdgeManager.currentTimeMillis(), table);
return result;
}
}
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
// The mutations to add a column are written in the following order:
// 1. Update the encoded column qualifier for the parent table if its on a
// different region server (for tables that use column qualifier encoding)
// if the next step fails we end up wasting a few col qualifiers
// 2. Write the mutations to add the column
List<Mutation> localMutations =
Lists.newArrayListWithExpectedSize(tableMetadata.size());
List<Mutation> remoteMutations = Lists.newArrayList();
separateLocalAndRemoteMutations(region, tableMetadata, localMutations,
remoteMutations);
if (!remoteMutations.isEmpty()) {
// there should only be remote mutations if we are adding a column to a view
// that uses encoded column qualifiers (the remote mutations are to update the
// encoded column qualifier counter on the parent table)
if (( mutator.getMutateColumnType() == ColumnMutator.MutateColumnType.ADD_COLUMN
&& type == PTableType.VIEW
&& table.getEncodingScheme() !=
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS)) {
processRemoteRegionMutations(
SYSTEM_CATALOG_NAME_BYTES, remoteMutations,
MetaDataProtos.MutationCode.UNABLE_TO_UPDATE_PARENT_TABLE);
//if we're a view or index, clear the cache for our parent
if ((type == PTableType.VIEW || type == INDEX) && table.getParentTableName() != null) {
clearRemoteTableFromCache(clientTimeStamp,
table.getParentSchemaName() != null
? table.getParentSchemaName().getBytes()
: ByteUtil.EMPTY_BYTE_ARRAY,
table.getParentTableName().getBytes());
}
} else {
String msg = "Found unexpected mutations while adding or dropping column "
+ "to " + fullTableName;
LOGGER.error(msg);
for (Mutation m : remoteMutations) {
LOGGER.debug("Mutation rowkey : " + Bytes.toStringBinary(m.getRow()));
LOGGER.debug("Mutation family cell map : " + m.getFamilyCellMap());
}
throw new IllegalStateException(msg);
}
}
// Update SYSTEM.CATALOG indexes only for ordinary table column mutations.
// Column mutations of indexes are not allowed. See above
// Add column on SYSTEM.CATALOG should not be processed for index updates,
// since an index on a future column cannot exist.
boolean
updateCatalogIndexes =
!Bytes.toString(schemaName)
.equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
mutateRowsWithLocks(this.accessCheckEnabled, env, region, localMutations,
Collections.<byte[]>emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE,
updateCatalogIndexes);
// Invalidate from cache
for (ImmutableBytesPtr invalidateKey : invalidateList) {
metaDataCache.invalidate(invalidateKey);
}
// Get client timeStamp from mutations, since it may get updated by the
// mutateRowsWithLocks call
long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
// if the update mutation caused tables to be deleted just return the result which
// will contain the table to be deleted
if (result != null
&& result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
return result;
} else {
table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP,
clientVersion);
if (clientVersion < MIN_SPLITTABLE_SYSTEM_CATALOG && type == PTableType.VIEW) {
try (PhoenixConnection connection = getServerConnectionForMetaData(
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
PTable pTable = connection.getTableNoCache(
table.getParentName().getString());
table = ViewUtil.addDerivedColumnsAndIndexesFromParent(connection,
table, pTable);
}
}
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,
currentTime, table, tableNamesToDelete, sharedTablesToDelete);
}
} finally {
ServerUtil.releaseRowLocks(locks);
// drop indexes on views that require the column being dropped. These could be on a
// different region server so don't hold row locks while dropping them
for (Pair<PTable, PColumn> pair : mutator.getTableAndDroppedColumnPairs()) {
result = dropRemoteIndexes(env, pair.getFirst(), clientTimeStamp,
pair.getSecond(), tableNamesToDelete, sharedTablesToDelete);
if (result != null
&& result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
return result;
}
}
}
} catch (Throwable t) {
ClientUtil.throwIOException(fullTableName, t);
return null; // impossible
}
}