in phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java [4265:4549]
public void updateIndexState(RpcController controller, UpdateIndexStateRequest request,
RpcCallback<MetaDataResponse> done) {
MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
byte[] schemaName = null;
byte[] tableName = null;
try {
byte[][] rowKeyMetaData = new byte[3][];
List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
final byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
Region region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(key, region);
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
return;
}
long timeStamp = HConstants.LATEST_TIMESTAMP;
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
List<Cell> newKVs = tableMetadata.get(0).getFamilyCellMap().get(TABLE_FAMILY_BYTES);
Cell newKV = null;
int disableTimeStampKVIndex = -1;
int indexStateKVIndex = 0;
int index = 0;
for (Cell cell : newKVs) {
if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
INDEX_STATE_BYTES, 0, INDEX_STATE_BYTES.length) == 0) {
newKV = cell;
indexStateKVIndex = index;
timeStamp = cell.getTimestamp();
} else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
INDEX_DISABLE_TIMESTAMP_BYTES, 0, INDEX_DISABLE_TIMESTAMP_BYTES.length) == 0) {
disableTimeStampKVIndex = index;
}
index++;
}
PIndexState newState =
PIndexState.fromSerializedValue(newKV.getValueArray()[newKV.getValueOffset()]);
RowLock rowLock = acquireLock(region, key, null, false);
if (rowLock == null) {
throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
}
Get get = new Get(key);
get.addColumn(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
get.addColumn(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
get.addColumn(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
get.addColumn(TABLE_FAMILY_BYTES, MetaDataEndpointImplConstants.ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
try (RegionScanner scanner = region.getScanner(new Scan(get))) {
List<Cell> cells = new ArrayList<>();
scanner.next(cells);
if (cells.isEmpty()) {
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
done.run(builder.build());
return;
}
Result currentResult = Result.create(cells);
Cell dataTableKV = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
Cell currentStateKV = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
Cell currentDisableTimeStamp = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
boolean rowKeyOrderOptimizable = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, MetaDataEndpointImplConstants.ROW_KEY_ORDER_OPTIMIZABLE_BYTES) != null;
//check permission on data table
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
PTable loadedTable =
doGetTable(tenantId, schemaName, tableName, clientTimeStamp, null,
request.getClientVersion());
if (loadedTable == null) {
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
done.run(builder.build());
return;
}
List<InvalidateServerMetadataCacheRequest> requests = new ArrayList<>();
requests.add(new InvalidateServerMetadataCacheRequest(tenantId, schemaName,
tableName));
invalidateServerMetadataCache(requests);
getCoprocessorHost().preIndexUpdate(Bytes.toString(tenantId),
SchemaUtil.getTableName(schemaName, tableName),
TableName.valueOf(loadedTable.getPhysicalName().getBytes()),
getParentPhysicalTableName(loadedTable),
newState);
PIndexState currentState =
PIndexState.fromSerializedValue(currentStateKV.getValueArray()[currentStateKV
.getValueOffset()]);
// Timestamp of INDEX_STATE gets updated with each call
long actualTimestamp = currentStateKV.getTimestamp();
long curTimeStampVal = 0;
long newDisableTimeStamp = 0;
if ((currentDisableTimeStamp != null && currentDisableTimeStamp.getValueLength() > 0)) {
curTimeStampVal = (Long) PLong.INSTANCE.toObject(currentDisableTimeStamp.getValueArray(),
currentDisableTimeStamp.getValueOffset(), currentDisableTimeStamp.getValueLength());
// new DisableTimeStamp is passed in
if (disableTimeStampKVIndex >= 0) {
Cell newDisableTimeStampCell = newKVs.get(disableTimeStampKVIndex);
long expectedTimestamp = newDisableTimeStampCell.getTimestamp();
// If the index status has been updated after the upper bound of the scan we use
// to partially rebuild the index, then we need to fail the rebuild because an
// index write failed before the rebuild was complete.
if (actualTimestamp > expectedTimestamp) {
builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
done.run(builder.build());
return;
}
newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(),
newDisableTimeStampCell.getValueOffset(), newDisableTimeStampCell.getValueLength());
// We use the sign of the INDEX_DISABLE_TIMESTAMP to differentiate the keep-index-active (negative)
// from block-writes-to-data-table case. In either case, we want to keep the oldest timestamp to
// drive the partial index rebuild rather than update it with each attempt to update the index
// when a new data table write occurs.
// We do legitimately move the INDEX_DISABLE_TIMESTAMP to be newer when we're rebuilding the
// index in which case the state will be INACTIVE or PENDING_ACTIVE.
if (curTimeStampVal != 0
&& (newState == PIndexState.DISABLE || newState == PIndexState.PENDING_ACTIVE || newState == PIndexState.PENDING_DISABLE)
&& Math.abs(curTimeStampVal) < Math.abs(newDisableTimeStamp)) {
// do not reset disable timestamp as we want to keep the min
newKVs.remove(disableTimeStampKVIndex);
disableTimeStampKVIndex = -1;
}
}
}
// Detect invalid transitions
if (currentState == PIndexState.BUILDING) {
if (newState == PIndexState.USABLE) {
builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
done.run(builder.build());
return;
}
} else if (currentState == PIndexState.DISABLE) {
// Index already disabled, so can't revert to PENDING_DISABLE
if (newState == PIndexState.PENDING_DISABLE) {
// returning TABLE_ALREADY_EXISTS here means the client doesn't throw an exception
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
done.run(builder.build());
return;
}
// Can't transition back to INACTIVE if INDEX_DISABLE_TIMESTAMP is 0
if (newState != PIndexState.BUILDING && newState != PIndexState.DISABLE &&
(newState != PIndexState.INACTIVE || curTimeStampVal == 0)) {
builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
done.run(builder.build());
return;
}
// Done building, but was disable before that, so that in disabled state
if (newState == PIndexState.ACTIVE) {
newState = PIndexState.DISABLE;
}
}
if (newState == PIndexState.PENDING_DISABLE && currentState != PIndexState.PENDING_DISABLE
&& currentState != PIndexState.INACTIVE) {
// reset count for first PENDING_DISABLE
newKVs.add(PhoenixKeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, timeStamp, Bytes.toBytes(0L)));
}
if (currentState == PIndexState.PENDING_DISABLE) {
if (newState == PIndexState.ACTIVE) {
//before making index ACTIVE check if all clients succeed otherwise keep it PENDING_DISABLE
byte[] count;
try (RegionScanner countScanner = region.getScanner(new Scan(get))) {
List<Cell> countCells = new ArrayList<>();
countScanner.next(countCells);
count = Result.create(countCells)
.getValue(TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES);
}
if (count != null && Bytes.toLong(count) != 0) {
newState = PIndexState.PENDING_DISABLE;
newKVs.remove(disableTimeStampKVIndex);
newKVs.set(indexStateKVIndex, PhoenixKeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
} else if (disableTimeStampKVIndex == -1) { // clear disableTimestamp if client didn't pass it in
newKVs.add(PhoenixKeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, timeStamp, PLong.INSTANCE.toBytes(0)));
disableTimeStampKVIndex = newKVs.size() - 1;
}
} else if (newState == PIndexState.DISABLE) {
//reset the counter for pending disable when transitioning from PENDING_DISABLE to DISABLE
newKVs.add(PhoenixKeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, timeStamp, Bytes.toBytes(0L)));
}
}
if (newState == PIndexState.ACTIVE || newState == PIndexState.PENDING_ACTIVE || newState == PIndexState.DISABLE) {
newKVs.add(PhoenixKeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, timeStamp, Bytes.toBytes(0L)));
}
if (currentState == PIndexState.BUILDING && newState != PIndexState.ACTIVE) {
timeStamp = currentStateKV.getTimestamp();
}
if ((currentState == PIndexState.ACTIVE || currentState == PIndexState.PENDING_ACTIVE) && newState == PIndexState.UNUSABLE) {
newState = PIndexState.INACTIVE;
newKVs.set(indexStateKVIndex, PhoenixKeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
} else if ((currentState == PIndexState.INACTIVE || currentState == PIndexState.PENDING_ACTIVE) && newState == PIndexState.USABLE) {
// Don't allow manual state change to USABLE (i.e. ACTIVE) if non zero INDEX_DISABLE_TIMESTAMP
if (curTimeStampVal != 0) {
newState = currentState;
} else {
newState = PIndexState.ACTIVE;
}
newKVs.set(indexStateKVIndex, PhoenixKeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
}
PTable returnTable = null;
if (currentState != newState || disableTimeStampKVIndex != -1) {
// make a copy of tableMetadata so we can add to it
tableMetadata = new ArrayList<Mutation>(tableMetadata);
// Always include the empty column value at latest timestamp so
// that clients pull over update.
Put emptyValue = new Put(key);
emptyValue.addColumn(TABLE_FAMILY_BYTES,
QueryConstants.EMPTY_COLUMN_BYTES,
HConstants.LATEST_TIMESTAMP,
QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
tableMetadata.add(emptyValue);
byte[] dataTableKey = null;
if (dataTableKV != null) {
dataTableKey = SchemaUtil.getTableKey(tenantId, schemaName, CellUtil.cloneValue(dataTableKV));
// insert an empty KV to trigger time stamp update on data table row
Put p = new Put(dataTableKey);
p.addColumn(TABLE_FAMILY_BYTES,
QueryConstants.EMPTY_COLUMN_BYTES,
HConstants.LATEST_TIMESTAMP,
QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
tableMetadata.add(p);
}
boolean setRowKeyOrderOptimizableCell = newState == PIndexState.BUILDING && !rowKeyOrderOptimizable;
// We're starting a rebuild of the index, so add our rowKeyOrderOptimizable cell
// so that the row keys get generated using the new row key format
if (setRowKeyOrderOptimizableCell) {
UpgradeUtil.addRowKeyOrderOptimizableCell(tableMetadata, key, timeStamp);
}
// We are updating the state of an index, so update the DDL timestamp.
long serverTimestamp = EnvironmentEdgeManager.currentTimeMillis();
tableMetadata.add(MetaDataUtil.getLastDDLTimestampUpdate(
key, clientTimeStamp, serverTimestamp));
boolean updateCatalogIndexes = !Bytes.toString(schemaName)
.equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
mutateRowsWithLocks(this.accessCheckEnabled, env, region, tableMetadata,
Collections.<byte[]>emptySet(), HConstants.NO_NONCE,
HConstants.NO_NONCE, updateCatalogIndexes);
// Invalidate from cache
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
metaDataCache.invalidate(cacheKey);
if (dataTableKey != null) {
metaDataCache.invalidate(new ImmutableBytesPtr(dataTableKey));
}
if (setRowKeyOrderOptimizableCell || disableTimeStampKVIndex != -1
|| currentState.isDisabled() || newState == PIndexState.BUILDING) {
returnTable = doGetTable(tenantId, schemaName, tableName,
HConstants.LATEST_TIMESTAMP, rowLock, request.getClientVersion());
}
}
// Get client timeStamp from mutations, since it may get updated by the
// mutateRowsWithLocks call
long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
builder.setMutationTime(currentTime);
if (returnTable != null) {
builder.setTable(PTableImpl.toProto(returnTable, request.getClientVersion()));
}
done.run(builder.build());
return;
} finally {
rowLock.release();
}
} catch (Throwable t) {
LOGGER.error("updateIndexState failed", t);
ProtobufUtil.setControllerException(controller,
ClientUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
}
}