public MutationState createIndex()

in phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java [1634:1962]


    public MutationState createIndex(CreateIndexStatement statement, byte[][] splits) throws SQLException {
        IndexKeyConstraint ik = statement.getIndexConstraint();
        TableName indexTableName = statement.getIndexTableName();

        Map<String,Object> tableProps = Maps.newHashMapWithExpectedSize(statement.getProps().size());
        Map<String,Object> commonFamilyProps = Maps.newHashMapWithExpectedSize(statement.getProps().size() + 1);
        populatePropertyMaps(statement.getProps(), tableProps, commonFamilyProps, PTableType.INDEX,
                CDCUtil.isCDCIndex(SchemaUtil
                        .getTableNameFromFullName(statement.getIndexTableName().toString())));
        List<Pair<ParseNode, SortOrder>> indexParseNodeAndSortOrderList = ik.getParseNodeAndSortOrderList();
        List<ColumnName> includedColumns = statement.getIncludeColumns();
        TableRef tableRef = null;
        PTable table = null;
        boolean allocateIndexId = false;
        boolean isLocalIndex = statement.getIndexType() == IndexType.LOCAL;
        int hbaseVersion = connection.getQueryServices().getLowestClusterHBaseVersion();
        if (isLocalIndex) {
            if (!connection.getQueryServices().getProps().getBoolean(QueryServices.ALLOW_LOCAL_INDEX_ATTRIB, QueryServicesOptions.DEFAULT_ALLOW_LOCAL_INDEX)) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNALLOWED_LOCAL_INDEXES).setTableName(indexTableName.getTableName()).build().buildException();
            }
            if (!connection.getQueryServices().supportsFeature(Feature.LOCAL_INDEX)) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_LOCAL_INDEXES).setTableName(indexTableName.getTableName()).build().buildException();
            }
        }
        Set<String> acquiredColumnMutexSet = Sets.newHashSetWithExpectedSize(3);
        String physicalSchemaName = null;
        String physicalTableName = null;
        PTable dataTable = null;
        try {
            ColumnResolver resolver
                    = FromCompiler.getResolverForCreateIndex(
                            statement, connection, statement.getUdfParseNodes());
            tableRef = resolver.getTables().get(0);
            Date asyncCreatedDate = null;
            if (statement.isAsync()) {
                asyncCreatedDate = new Date(tableRef.getCurrentTime());
            }
            dataTable = tableRef.getTable();
            boolean isTenantConnection = connection.getTenantId() != null;
            if (isTenantConnection) {
                if (dataTable.getType() != PTableType.VIEW) {
                    throw new SQLFeatureNotSupportedException("An index may only be created for a VIEW through a tenant-specific connection");
                }
            }
            if (!dataTable.isImmutableRows()) {
                if (hbaseVersion < MetaDataProtocol.MUTABLE_SI_VERSION_THRESHOLD) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.NO_MUTABLE_INDEXES).setTableName(indexTableName.getTableName()).build().buildException();
                }
                if (!connection.getQueryServices().hasIndexWALCodec() && !dataTable.isTransactional()) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_MUTABLE_INDEX_CONFIG).setTableName(indexTableName.getTableName()).build().buildException();
                }
                boolean tableWithRowTimestampCol = dataTable.getRowTimestampColPos() != -1;
                if (tableWithRowTimestampCol) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_INDEX_ON_MUTABLE_TABLE_WITH_ROWTIMESTAMP).setTableName(indexTableName.getTableName()).build().buildException();
                }
            }
            if (dataTable.isTransactional()
                    && isLocalIndex
                    && dataTable.getTransactionProvider().getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.ALLOW_LOCAL_INDEX)) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_LOCAL_INDEX_FOR_TXN_TABLE).setMessage(dataTable.getTransactionProvider().name()).setTableName(indexTableName.getTableName()).build().buildException();
            }
            int posOffset = 0;
            List<PColumn> pkColumns = dataTable.getPKColumns();
            Set<RowKeyColumnExpression> unusedPkColumns;
            if (dataTable.getBucketNum() != null) { // Ignore SALT column
                unusedPkColumns = Sets.newLinkedHashSetWithExpectedSize(pkColumns.size()-1);
                posOffset++;
            } else {
                unusedPkColumns = Sets.newLinkedHashSetWithExpectedSize(pkColumns.size());
            }
            for (int i = posOffset; i < pkColumns.size(); i++) {
                PColumn column = pkColumns.get(i);
                unusedPkColumns.add(new RowKeyColumnExpression(column, new RowKeyValueAccessor(pkColumns, i), "\""+column.getName().getString()+"\""));
            }
            List<ColumnDefInPkConstraint> allPkColumns = Lists.newArrayListWithExpectedSize(unusedPkColumns.size());
            List<ColumnDef> columnDefs = Lists.newArrayListWithExpectedSize(includedColumns.size() + indexParseNodeAndSortOrderList.size());

            /*
             * Allocate an index ID in two circumstances:
             * 1) for a local index, as all local indexes will reside in the same HBase table
             * 2) for a view on an index.
             */
            if (isLocalIndex || (dataTable.getType() == PTableType.VIEW && dataTable.getViewType() != ViewType.MAPPED)) {
                allocateIndexId = true;
                PDataType dataType = getViewIndexDataType();
                ColumnName colName = ColumnName.caseSensitiveColumnName(MetaDataUtil.getViewIndexIdColumnName());
                allPkColumns.add(new ColumnDefInPkConstraint(colName, SortOrder.getDefault(), false));
                columnDefs.add(FACTORY.columnDef(colName, dataType.getSqlTypeName(), false, null, null, false, SortOrder.getDefault(), null, false));
            }

            if (dataTable.isMultiTenant()) {
                PColumn col = dataTable.getPKColumns().get(posOffset);
                RowKeyColumnExpression columnExpression = new RowKeyColumnExpression(col, new RowKeyValueAccessor(pkColumns, posOffset), col.getName().getString());
                unusedPkColumns.remove(columnExpression);
                PDataType dataType = IndexUtil.getIndexColumnDataType(col);
                ColumnName colName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(col));
                allPkColumns.add(new ColumnDefInPkConstraint(colName, col.getSortOrder(), false));
                columnDefs.add(FACTORY.columnDef(colName, dataType.getSqlTypeName(), col.isNullable(), col.getMaxLength(), col.getScale(), false, SortOrder.getDefault(), col.getName().getString(), col.isRowTimestamp()));
            }

            PhoenixStatement phoenixStatment = new PhoenixStatement(connection);
            StatementContext context = new StatementContext(phoenixStatment, resolver);
            IndexExpressionCompiler expressionIndexCompiler = new IndexExpressionCompiler(context);
            Set<ColumnName> indexedColumnNames = Sets.newHashSetWithExpectedSize(indexParseNodeAndSortOrderList.size());
            for (Pair<ParseNode, SortOrder> pair : indexParseNodeAndSortOrderList) {
                ParseNode parseNode = pair.getFirst();
                // normalize the parse node
                parseNode = StatementNormalizer.normalize(parseNode, resolver);
                // compile the parseNode to get an expression
                expressionIndexCompiler.reset();
                Expression expression = parseNode.accept(expressionIndexCompiler);
                if (expressionIndexCompiler.isAggregate()) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX).build().buildException();
                }
                if (expressionIndexCompiler.isJsonFragment()) {
                    throw new SQLExceptionInfo.Builder(
                            SQLExceptionCode.JSON_FRAGMENT_NOT_ALLOWED_IN_INDEX_EXPRESSION).build()
                            .buildException();
                }
                if (!(expression.getDeterminism() == Determinism.ALWAYS || expression.getDeterminism() == Determinism.PER_ROW)) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX).build().buildException();
                }
                if (expression.isStateless()) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.STATELESS_EXPRESSION_NOT_ALLOWED_IN_INDEX).build().buildException();
                }
                unusedPkColumns.remove(expression);

                // Go through parse node to get string as otherwise we
                // can lose information during compilation
                StringBuilder buf = new StringBuilder();
                parseNode.toSQL(resolver, buf);
                // need to escape backslash as this expression will be re-parsed later
                String expressionStr = StringUtil.escapeBackslash(buf.toString());

                ColumnName colName = null;
                ColumnRef colRef = expressionIndexCompiler.getColumnRef();
                boolean isRowTimestamp = false;
                if (colRef!=null) {
                    // if this is a regular column
                    PColumn column = colRef.getColumn();
                    String columnFamilyName = column.getFamilyName()!=null ? column.getFamilyName().getString() : null;
                    colName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(columnFamilyName, column.getName().getString()));
                    isRowTimestamp = column.isRowTimestamp();
                }
                else {
                    // if this is an expression
                    // TODO column names cannot have double quotes, remove this once this PHOENIX-1621 is fixed
                    String name = expressionStr.replaceAll("\"", "'");
                    colName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(null, name));
                }
                indexedColumnNames.add(colName);
                PDataType dataType = IndexUtil.getIndexColumnDataType(expression.isNullable(), expression.getDataType());
                allPkColumns.add(new ColumnDefInPkConstraint(colName, pair.getSecond(), isRowTimestamp));
                columnDefs.add(FACTORY.columnDef(colName, dataType.getSqlTypeName(), expression.isNullable(), expression.getMaxLength(), expression.getScale(), false, pair.getSecond(), expressionStr, isRowTimestamp));
            }

            // Next all the PK columns from the data table that aren't indexed
            if (!unusedPkColumns.isEmpty()) {
                for (RowKeyColumnExpression colExpression : unusedPkColumns) {
                    PColumn col = dataTable.getPKColumns().get(colExpression.getPosition());
                    // Don't add columns with constant values from updatable views, as
                    // we don't need these in the index
                    if (col.getViewConstant() == null) {
                        ColumnName colName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(col));
                        allPkColumns.add(new ColumnDefInPkConstraint(colName, colExpression.getSortOrder(), col.isRowTimestamp()));
                        PDataType dataType = IndexUtil.getIndexColumnDataType(colExpression.isNullable(), colExpression.getDataType());
                        columnDefs.add(FACTORY.columnDef(colName, dataType.getSqlTypeName(),
                                colExpression.isNullable(), colExpression.getMaxLength(), colExpression.getScale(),
                                false, colExpression.getSortOrder(), colExpression.toString(), col.isRowTimestamp()));
                    }
                }
            }

            // Last all the included columns (minus any PK columns)
            for (ColumnName colName : includedColumns) {
                PColumn col = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName()).getColumn();
                colName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(col));
                // Check for duplicates between indexed and included columns
                if (indexedColumnNames.contains(colName)) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_EXIST_IN_DEF).build().buildException();
                }
                if (!SchemaUtil.isPKColumn(col) && col.getViewConstant() == null) {
                    // Need to re-create ColumnName, since the above one won't have the column family name
                    colName = ColumnName.caseSensitiveColumnName(isLocalIndex?IndexUtil.getLocalIndexColumnFamily(col.getFamilyName().getString()):col.getFamilyName().getString(), IndexUtil.getIndexColumnName(col));
                    columnDefs.add(FACTORY.columnDef(colName, col.getDataType().getSqlTypeName(), col.isNullable(), col.getMaxLength(), col.getScale(), false, col.getSortOrder(), col.getExpressionStr(), col.isRowTimestamp()));
                }
            }

            Configuration config = connection.getQueryServices().getConfiguration();
            // Do descendant view validation only when enabled and not a SYSTEM table/index
            if (!connection.getQueryServices().getProps()
                .getBoolean(DISABLE_VIEW_SUBTREE_VALIDATION,
                    DEFAULT_DISABLE_VIEW_SUBTREE_VALIDATION) &&
                    !QueryConstants.SYSTEM_SCHEMA_NAME.equals(dataTable.getSchemaName().getString())) {
                verifyIfDescendentViewsExtendPk(dataTable, config);
            }
            // for view indexes
            if (dataTable.getType() == PTableType.VIEW) {
                String physicalName = dataTable.getPhysicalName().getString();
                physicalSchemaName = SchemaUtil.getSchemaNameFromFullName(physicalName);
                physicalTableName = SchemaUtil.getTableNameFromFullName(physicalName);
                List<ColumnName> requiredCols = Lists.newArrayList(indexedColumnNames);
                requiredCols.addAll(includedColumns);
                for (ColumnName colName : requiredCols) {
                    // acquire the mutex using the global physical table name to
                    // prevent this column from being dropped while the view is being created
                    String colNameSeparatedByDot = colName.getColumnName()
                            .replace(QueryConstants.NAMESPACE_SEPARATOR,
                                     QueryConstants.NAME_SEPARATOR);
                    // indexed column name have a ':' between the column family and column name
                    // We would like to have '.' like in other column names
                    boolean acquiredMutex = writeCell(null, physicalSchemaName, physicalTableName,
                            colNameSeparatedByDot);
                    if (!acquiredMutex) {
                        throw new ConcurrentTableMutationException(physicalSchemaName, physicalTableName);
                    }
                    acquiredColumnMutexSet.add(colNameSeparatedByDot);
                }
            }

            long threshold = Long.parseLong(config.get(QueryServices.CLIENT_INDEX_ASYNC_THRESHOLD));

            if (threshold > 0 && !statement.isAsync()) {
                Set<String> columnFamilies = new HashSet<>();
                for (ColumnDef column : columnDefs){
                    try {
                        String columnFamily = IndexUtil
                                .getDataColumnFamilyName(column.getColumnDefName().getColumnName());
                        columnFamilies.add(!columnFamily.equals("") ? columnFamily
                                : dataTable.getDefaultFamilyName()!= null ?
                                        dataTable.getDefaultFamilyName().toString()
                                        : QueryConstants.DEFAULT_COLUMN_FAMILY);
                    } catch (Exception ignored){
                        ; // We ignore any exception during this phase
                    }
                }
                long estimatedBytes = 0;
                for (String colFamily : columnFamilies) {
                    GuidePostsInfo gps = connection.getQueryServices().getTableStats(
                            new GuidePostsKey(Bytes.toBytes(tableRef.getTable().toString()),
                                    Bytes.toBytes(colFamily)));
                    long[] byteCounts = gps.getByteCounts();
                    for (long byteCount : byteCounts) {
                        estimatedBytes += byteCount;
                    }

                    if (threshold < estimatedBytes) {
                        throw new SQLExceptionInfo
                                .Builder(SQLExceptionCode.ABOVE_INDEX_NON_ASYNC_THRESHOLD)
                                .build().buildException();
                    }
                }
            }

            // Set DEFAULT_COLUMN_FAMILY_NAME of index to match data table
            // We need this in the props so that the correct column family is created
            if (dataTable.getDefaultFamilyName() != null && dataTable.getType() != PTableType.VIEW && !allocateIndexId) {
                statement.getProps().put("", new Pair<String,Object>(DEFAULT_COLUMN_FAMILY_NAME,dataTable.getDefaultFamilyName().getString()));
            }
            PrimaryKeyConstraint pk = FACTORY.primaryKey(null, allPkColumns);

            tableProps.put(MetaDataUtil.DATA_TABLE_NAME_PROP_NAME, dataTable.getPhysicalName().getString());
            CreateTableStatement tableStatement = FACTORY.createTable(
                    indexTableName,
                    statement.getProps(),
                    columnDefs,
                    pk,
                    statement.getSplitNodes(),
                    PTableType.INDEX,
                    statement.ifNotExists(),
                    null,
                    statement.getWhere(),
                    statement.getBindCount(),
                    null
            );
            table = createTableInternal(
                    tableStatement,
                    splits,
                    dataTable,
                    null,
                    null,
                    getViewIndexDataType(),
                    null,
                    null,
                    null,
                    allocateIndexId,
                    statement.getIndexType(),
                    asyncCreatedDate,
                    null,
                    tableProps,
                    commonFamilyProps
            );
        }
        finally {
            deleteMutexCells(physicalSchemaName, physicalTableName, acquiredColumnMutexSet);
        }
        if (table == null) {
            return new MutationState(0, 0, connection);
        }

        if (LOGGER.isInfoEnabled()) LOGGER.info("Created index " + table.getName().getString() + " at " + table.getTimeStamp());
        boolean asyncIndexBuildEnabled = connection.getQueryServices().getProps().getBoolean(
                QueryServices.INDEX_ASYNC_BUILD_ENABLED,
                QueryServicesOptions.DEFAULT_INDEX_ASYNC_BUILD_ENABLED);
        // In async process, we return immediately as the MR job needs to be triggered .
        if (statement.isAsync() && asyncIndexBuildEnabled) {
            return new MutationState(0, 0, connection);
        }

        // If we create index in create_disabled state, we will build them later
        if (table.getIndexState() == PIndexState.CREATE_DISABLE) {
            return new MutationState(0, 0, connection);
        }

        // If our connection is at a fixed point-in-time, we need to open a new
        // connection so that our new index table is visible.
        if (connection.getSCN() != null) {
            return buildIndexAtTimeStamp(table, statement.getTable());
        }

        MutationState state = buildIndex(table, tableRef);
        // If client is validating LAST_DDL_TIMESTAMPS, parent's last_ddl_timestamp changed
        // so remove it from client's cache. It will be refreshed when table is accessed next time.
        if (ValidateLastDDLTimestampUtil.getValidateLastDdlTimestampEnabled(connection)) {
            connection.removeTable(connection.getTenantId(), dataTable.getName().getString(),
                    null, dataTable.getTimeStamp());
        }
        return state;
    }