in phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java [1634:1962]
public MutationState createIndex(CreateIndexStatement statement, byte[][] splits) throws SQLException {
IndexKeyConstraint ik = statement.getIndexConstraint();
TableName indexTableName = statement.getIndexTableName();
Map<String,Object> tableProps = Maps.newHashMapWithExpectedSize(statement.getProps().size());
Map<String,Object> commonFamilyProps = Maps.newHashMapWithExpectedSize(statement.getProps().size() + 1);
populatePropertyMaps(statement.getProps(), tableProps, commonFamilyProps, PTableType.INDEX,
CDCUtil.isCDCIndex(SchemaUtil
.getTableNameFromFullName(statement.getIndexTableName().toString())));
List<Pair<ParseNode, SortOrder>> indexParseNodeAndSortOrderList = ik.getParseNodeAndSortOrderList();
List<ColumnName> includedColumns = statement.getIncludeColumns();
TableRef tableRef = null;
PTable table = null;
boolean allocateIndexId = false;
boolean isLocalIndex = statement.getIndexType() == IndexType.LOCAL;
int hbaseVersion = connection.getQueryServices().getLowestClusterHBaseVersion();
if (isLocalIndex) {
if (!connection.getQueryServices().getProps().getBoolean(QueryServices.ALLOW_LOCAL_INDEX_ATTRIB, QueryServicesOptions.DEFAULT_ALLOW_LOCAL_INDEX)) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNALLOWED_LOCAL_INDEXES).setTableName(indexTableName.getTableName()).build().buildException();
}
if (!connection.getQueryServices().supportsFeature(Feature.LOCAL_INDEX)) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_LOCAL_INDEXES).setTableName(indexTableName.getTableName()).build().buildException();
}
}
Set<String> acquiredColumnMutexSet = Sets.newHashSetWithExpectedSize(3);
String physicalSchemaName = null;
String physicalTableName = null;
PTable dataTable = null;
try {
ColumnResolver resolver
= FromCompiler.getResolverForCreateIndex(
statement, connection, statement.getUdfParseNodes());
tableRef = resolver.getTables().get(0);
Date asyncCreatedDate = null;
if (statement.isAsync()) {
asyncCreatedDate = new Date(tableRef.getCurrentTime());
}
dataTable = tableRef.getTable();
boolean isTenantConnection = connection.getTenantId() != null;
if (isTenantConnection) {
if (dataTable.getType() != PTableType.VIEW) {
throw new SQLFeatureNotSupportedException("An index may only be created for a VIEW through a tenant-specific connection");
}
}
if (!dataTable.isImmutableRows()) {
if (hbaseVersion < MetaDataProtocol.MUTABLE_SI_VERSION_THRESHOLD) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_MUTABLE_INDEXES).setTableName(indexTableName.getTableName()).build().buildException();
}
if (!connection.getQueryServices().hasIndexWALCodec() && !dataTable.isTransactional()) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_MUTABLE_INDEX_CONFIG).setTableName(indexTableName.getTableName()).build().buildException();
}
boolean tableWithRowTimestampCol = dataTable.getRowTimestampColPos() != -1;
if (tableWithRowTimestampCol) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_INDEX_ON_MUTABLE_TABLE_WITH_ROWTIMESTAMP).setTableName(indexTableName.getTableName()).build().buildException();
}
}
if (dataTable.isTransactional()
&& isLocalIndex
&& dataTable.getTransactionProvider().getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.ALLOW_LOCAL_INDEX)) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_LOCAL_INDEX_FOR_TXN_TABLE).setMessage(dataTable.getTransactionProvider().name()).setTableName(indexTableName.getTableName()).build().buildException();
}
int posOffset = 0;
List<PColumn> pkColumns = dataTable.getPKColumns();
Set<RowKeyColumnExpression> unusedPkColumns;
if (dataTable.getBucketNum() != null) { // Ignore SALT column
unusedPkColumns = Sets.newLinkedHashSetWithExpectedSize(pkColumns.size()-1);
posOffset++;
} else {
unusedPkColumns = Sets.newLinkedHashSetWithExpectedSize(pkColumns.size());
}
for (int i = posOffset; i < pkColumns.size(); i++) {
PColumn column = pkColumns.get(i);
unusedPkColumns.add(new RowKeyColumnExpression(column, new RowKeyValueAccessor(pkColumns, i), "\""+column.getName().getString()+"\""));
}
List<ColumnDefInPkConstraint> allPkColumns = Lists.newArrayListWithExpectedSize(unusedPkColumns.size());
List<ColumnDef> columnDefs = Lists.newArrayListWithExpectedSize(includedColumns.size() + indexParseNodeAndSortOrderList.size());
/*
* Allocate an index ID in two circumstances:
* 1) for a local index, as all local indexes will reside in the same HBase table
* 2) for a view on an index.
*/
if (isLocalIndex || (dataTable.getType() == PTableType.VIEW && dataTable.getViewType() != ViewType.MAPPED)) {
allocateIndexId = true;
PDataType dataType = getViewIndexDataType();
ColumnName colName = ColumnName.caseSensitiveColumnName(MetaDataUtil.getViewIndexIdColumnName());
allPkColumns.add(new ColumnDefInPkConstraint(colName, SortOrder.getDefault(), false));
columnDefs.add(FACTORY.columnDef(colName, dataType.getSqlTypeName(), false, null, null, false, SortOrder.getDefault(), null, false));
}
if (dataTable.isMultiTenant()) {
PColumn col = dataTable.getPKColumns().get(posOffset);
RowKeyColumnExpression columnExpression = new RowKeyColumnExpression(col, new RowKeyValueAccessor(pkColumns, posOffset), col.getName().getString());
unusedPkColumns.remove(columnExpression);
PDataType dataType = IndexUtil.getIndexColumnDataType(col);
ColumnName colName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(col));
allPkColumns.add(new ColumnDefInPkConstraint(colName, col.getSortOrder(), false));
columnDefs.add(FACTORY.columnDef(colName, dataType.getSqlTypeName(), col.isNullable(), col.getMaxLength(), col.getScale(), false, SortOrder.getDefault(), col.getName().getString(), col.isRowTimestamp()));
}
PhoenixStatement phoenixStatment = new PhoenixStatement(connection);
StatementContext context = new StatementContext(phoenixStatment, resolver);
IndexExpressionCompiler expressionIndexCompiler = new IndexExpressionCompiler(context);
Set<ColumnName> indexedColumnNames = Sets.newHashSetWithExpectedSize(indexParseNodeAndSortOrderList.size());
for (Pair<ParseNode, SortOrder> pair : indexParseNodeAndSortOrderList) {
ParseNode parseNode = pair.getFirst();
// normalize the parse node
parseNode = StatementNormalizer.normalize(parseNode, resolver);
// compile the parseNode to get an expression
expressionIndexCompiler.reset();
Expression expression = parseNode.accept(expressionIndexCompiler);
if (expressionIndexCompiler.isAggregate()) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX).build().buildException();
}
if (expressionIndexCompiler.isJsonFragment()) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.JSON_FRAGMENT_NOT_ALLOWED_IN_INDEX_EXPRESSION).build()
.buildException();
}
if (!(expression.getDeterminism() == Determinism.ALWAYS || expression.getDeterminism() == Determinism.PER_ROW)) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX).build().buildException();
}
if (expression.isStateless()) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.STATELESS_EXPRESSION_NOT_ALLOWED_IN_INDEX).build().buildException();
}
unusedPkColumns.remove(expression);
// Go through parse node to get string as otherwise we
// can lose information during compilation
StringBuilder buf = new StringBuilder();
parseNode.toSQL(resolver, buf);
// need to escape backslash as this expression will be re-parsed later
String expressionStr = StringUtil.escapeBackslash(buf.toString());
ColumnName colName = null;
ColumnRef colRef = expressionIndexCompiler.getColumnRef();
boolean isRowTimestamp = false;
if (colRef!=null) {
// if this is a regular column
PColumn column = colRef.getColumn();
String columnFamilyName = column.getFamilyName()!=null ? column.getFamilyName().getString() : null;
colName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(columnFamilyName, column.getName().getString()));
isRowTimestamp = column.isRowTimestamp();
}
else {
// if this is an expression
// TODO column names cannot have double quotes, remove this once this PHOENIX-1621 is fixed
String name = expressionStr.replaceAll("\"", "'");
colName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(null, name));
}
indexedColumnNames.add(colName);
PDataType dataType = IndexUtil.getIndexColumnDataType(expression.isNullable(), expression.getDataType());
allPkColumns.add(new ColumnDefInPkConstraint(colName, pair.getSecond(), isRowTimestamp));
columnDefs.add(FACTORY.columnDef(colName, dataType.getSqlTypeName(), expression.isNullable(), expression.getMaxLength(), expression.getScale(), false, pair.getSecond(), expressionStr, isRowTimestamp));
}
// Next all the PK columns from the data table that aren't indexed
if (!unusedPkColumns.isEmpty()) {
for (RowKeyColumnExpression colExpression : unusedPkColumns) {
PColumn col = dataTable.getPKColumns().get(colExpression.getPosition());
// Don't add columns with constant values from updatable views, as
// we don't need these in the index
if (col.getViewConstant() == null) {
ColumnName colName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(col));
allPkColumns.add(new ColumnDefInPkConstraint(colName, colExpression.getSortOrder(), col.isRowTimestamp()));
PDataType dataType = IndexUtil.getIndexColumnDataType(colExpression.isNullable(), colExpression.getDataType());
columnDefs.add(FACTORY.columnDef(colName, dataType.getSqlTypeName(),
colExpression.isNullable(), colExpression.getMaxLength(), colExpression.getScale(),
false, colExpression.getSortOrder(), colExpression.toString(), col.isRowTimestamp()));
}
}
}
// Last all the included columns (minus any PK columns)
for (ColumnName colName : includedColumns) {
PColumn col = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName()).getColumn();
colName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(col));
// Check for duplicates between indexed and included columns
if (indexedColumnNames.contains(colName)) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_EXIST_IN_DEF).build().buildException();
}
if (!SchemaUtil.isPKColumn(col) && col.getViewConstant() == null) {
// Need to re-create ColumnName, since the above one won't have the column family name
colName = ColumnName.caseSensitiveColumnName(isLocalIndex?IndexUtil.getLocalIndexColumnFamily(col.getFamilyName().getString()):col.getFamilyName().getString(), IndexUtil.getIndexColumnName(col));
columnDefs.add(FACTORY.columnDef(colName, col.getDataType().getSqlTypeName(), col.isNullable(), col.getMaxLength(), col.getScale(), false, col.getSortOrder(), col.getExpressionStr(), col.isRowTimestamp()));
}
}
Configuration config = connection.getQueryServices().getConfiguration();
// Do descendant view validation only when enabled and not a SYSTEM table/index
if (!connection.getQueryServices().getProps()
.getBoolean(DISABLE_VIEW_SUBTREE_VALIDATION,
DEFAULT_DISABLE_VIEW_SUBTREE_VALIDATION) &&
!QueryConstants.SYSTEM_SCHEMA_NAME.equals(dataTable.getSchemaName().getString())) {
verifyIfDescendentViewsExtendPk(dataTable, config);
}
// for view indexes
if (dataTable.getType() == PTableType.VIEW) {
String physicalName = dataTable.getPhysicalName().getString();
physicalSchemaName = SchemaUtil.getSchemaNameFromFullName(physicalName);
physicalTableName = SchemaUtil.getTableNameFromFullName(physicalName);
List<ColumnName> requiredCols = Lists.newArrayList(indexedColumnNames);
requiredCols.addAll(includedColumns);
for (ColumnName colName : requiredCols) {
// acquire the mutex using the global physical table name to
// prevent this column from being dropped while the view is being created
String colNameSeparatedByDot = colName.getColumnName()
.replace(QueryConstants.NAMESPACE_SEPARATOR,
QueryConstants.NAME_SEPARATOR);
// indexed column name have a ':' between the column family and column name
// We would like to have '.' like in other column names
boolean acquiredMutex = writeCell(null, physicalSchemaName, physicalTableName,
colNameSeparatedByDot);
if (!acquiredMutex) {
throw new ConcurrentTableMutationException(physicalSchemaName, physicalTableName);
}
acquiredColumnMutexSet.add(colNameSeparatedByDot);
}
}
long threshold = Long.parseLong(config.get(QueryServices.CLIENT_INDEX_ASYNC_THRESHOLD));
if (threshold > 0 && !statement.isAsync()) {
Set<String> columnFamilies = new HashSet<>();
for (ColumnDef column : columnDefs){
try {
String columnFamily = IndexUtil
.getDataColumnFamilyName(column.getColumnDefName().getColumnName());
columnFamilies.add(!columnFamily.equals("") ? columnFamily
: dataTable.getDefaultFamilyName()!= null ?
dataTable.getDefaultFamilyName().toString()
: QueryConstants.DEFAULT_COLUMN_FAMILY);
} catch (Exception ignored){
; // We ignore any exception during this phase
}
}
long estimatedBytes = 0;
for (String colFamily : columnFamilies) {
GuidePostsInfo gps = connection.getQueryServices().getTableStats(
new GuidePostsKey(Bytes.toBytes(tableRef.getTable().toString()),
Bytes.toBytes(colFamily)));
long[] byteCounts = gps.getByteCounts();
for (long byteCount : byteCounts) {
estimatedBytes += byteCount;
}
if (threshold < estimatedBytes) {
throw new SQLExceptionInfo
.Builder(SQLExceptionCode.ABOVE_INDEX_NON_ASYNC_THRESHOLD)
.build().buildException();
}
}
}
// Set DEFAULT_COLUMN_FAMILY_NAME of index to match data table
// We need this in the props so that the correct column family is created
if (dataTable.getDefaultFamilyName() != null && dataTable.getType() != PTableType.VIEW && !allocateIndexId) {
statement.getProps().put("", new Pair<String,Object>(DEFAULT_COLUMN_FAMILY_NAME,dataTable.getDefaultFamilyName().getString()));
}
PrimaryKeyConstraint pk = FACTORY.primaryKey(null, allPkColumns);
tableProps.put(MetaDataUtil.DATA_TABLE_NAME_PROP_NAME, dataTable.getPhysicalName().getString());
CreateTableStatement tableStatement = FACTORY.createTable(
indexTableName,
statement.getProps(),
columnDefs,
pk,
statement.getSplitNodes(),
PTableType.INDEX,
statement.ifNotExists(),
null,
statement.getWhere(),
statement.getBindCount(),
null
);
table = createTableInternal(
tableStatement,
splits,
dataTable,
null,
null,
getViewIndexDataType(),
null,
null,
null,
allocateIndexId,
statement.getIndexType(),
asyncCreatedDate,
null,
tableProps,
commonFamilyProps
);
}
finally {
deleteMutexCells(physicalSchemaName, physicalTableName, acquiredColumnMutexSet);
}
if (table == null) {
return new MutationState(0, 0, connection);
}
if (LOGGER.isInfoEnabled()) LOGGER.info("Created index " + table.getName().getString() + " at " + table.getTimeStamp());
boolean asyncIndexBuildEnabled = connection.getQueryServices().getProps().getBoolean(
QueryServices.INDEX_ASYNC_BUILD_ENABLED,
QueryServicesOptions.DEFAULT_INDEX_ASYNC_BUILD_ENABLED);
// In async process, we return immediately as the MR job needs to be triggered .
if (statement.isAsync() && asyncIndexBuildEnabled) {
return new MutationState(0, 0, connection);
}
// If we create index in create_disabled state, we will build them later
if (table.getIndexState() == PIndexState.CREATE_DISABLE) {
return new MutationState(0, 0, connection);
}
// If our connection is at a fixed point-in-time, we need to open a new
// connection so that our new index table is visible.
if (connection.getSCN() != null) {
return buildIndexAtTimeStamp(table, statement.getTable());
}
MutationState state = buildIndex(table, tableRef);
// If client is validating LAST_DDL_TIMESTAMPS, parent's last_ddl_timestamp changed
// so remove it from client's cache. It will be refreshed when table is accessed next time.
if (ValidateLastDDLTimestampUtil.getValidateLastDdlTimestampEnabled(connection)) {
connection.removeTable(connection.getTenantId(), dataTable.getName().getString(),
null, dataTable.getTimeStamp());
}
return state;
}