public MutationPlan compile()

in phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java [478:685]


    public MutationPlan compile(DeleteStatement delete, MutationState.ReturnResult returnResult)
            throws SQLException {
        final PhoenixConnection connection = statement.getConnection();
        final boolean isAutoCommit = connection.getAutoCommit();
        final boolean hasPostProcessing = delete.getLimit() != null;
        final ConnectionQueryServices services = connection.getQueryServices();
        List<QueryPlan> queryPlans;
        boolean allowServerMutations =
                services.getProps().getBoolean(QueryServices.ENABLE_SERVER_SIDE_DELETE_MUTATIONS,
                        QueryServicesOptions.DEFAULT_ENABLE_SERVER_SIDE_DELETE_MUTATIONS);
        NamedTableNode tableNode = delete.getTable();
        String tableName = tableNode.getName().getTableName();
        String schemaName = tableNode.getName().getSchemaName();
        SelectStatement select = null;
        ColumnResolver resolverToBe = null;
        DeletingParallelIteratorFactory parallelIteratorFactoryToBe;
        resolverToBe = FromCompiler.getResolverForMutation(delete, connection);
        final TableRef targetTableRef = resolverToBe.getTables().get(0);
        PTable table = targetTableRef.getTable();
        // Cannot update:
        // - read-only VIEW 
        // - transactional table with a connection having an SCN
        // TODO: SchemaUtil.isReadOnly(PTable, connection)?
        if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) {
            throw new ReadOnlyTableException(schemaName,tableName);
        }
        else if (table.isTransactional() && connection.getSCN() != null) {
           throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPECIFY_SCN_FOR_TXN_TABLE).setSchemaName(schemaName)
           .setTableName(tableName).build().buildException();
        }
        
        List<PTable> clientSideIndexes = getClientSideMaintainedIndexes(targetTableRef);
        final boolean hasClientSideIndexes = !clientSideIndexes.isEmpty();

        boolean isSalted = table.getBucketNum() != null;
        boolean isMultiTenant = connection.getTenantId() != null && table.isMultiTenant();
        boolean isSharedViewIndex = table.getViewIndexId() != null;
        int pkColumnOffset = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
        final int pkColumnCount = table.getPKColumns().size() - pkColumnOffset;
        int selectColumnCount = pkColumnCount;
        for (PTable index : clientSideIndexes) {
            selectColumnCount += index.getPKColumns().size() - pkColumnCount;
        }
        Set<PColumn> projectedColumns = new LinkedHashSet<PColumn>(selectColumnCount + pkColumnOffset);
        List<AliasedNode> aliasedNodes = Lists.newArrayListWithExpectedSize(selectColumnCount);
        for (int i = isSalted ? 1 : 0; i < pkColumnOffset; i++) {
            PColumn column = table.getPKColumns().get(i);
            projectedColumns.add(column);
        }
        for (int i = pkColumnOffset; i < table.getPKColumns().size(); i++) {
            PColumn column = table.getPKColumns().get(i);
            projectedColumns.add(column);
            aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(null, '"' + column.getName().getString() + '"', null)));
        }
        // Project all non PK indexed columns so that we can do the proper index maintenance on the indexes for which
        // mutations are generated on the client side. Indexed columns are needed to identify index rows to be deleted
        for (PTable index : table.getIndexes()) {
            if (isMaintainedOnClient(index)) {
                IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
                // Go through maintainer as it handles functional indexes correctly
                for (Pair<String, String> columnInfo : maintainer.getIndexedColumnInfo()) {
                    String familyName = columnInfo.getFirst();
                    if (familyName != null) {
                        String columnName = columnInfo.getSecond();
                        boolean hasNoColumnFamilies = table.getColumnFamilies().isEmpty();
                        PColumn column = hasNoColumnFamilies ? table.getColumnForColumnName(columnName) : table.getColumnFamily(familyName).getPColumnForColumnName(columnName);
                        if (!projectedColumns.contains(column)) {
                            projectedColumns.add(column);
                            aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(hasNoColumnFamilies ? null : TableName.create(null, familyName), '"' + columnName + '"', null)));
                        }
                    }
                }
            }
        }
        select = FACTORY.select(delete.getTable(), delete.getHint(), false, aliasedNodes, delete.getWhere(),
                Collections.<ParseNode> emptyList(), null, delete.getOrderBy(), delete.getLimit(), null,
                delete.getBindCount(), false, false, Collections.<SelectStatement> emptyList(),
                delete.getUdfParseNodes());
        select = StatementNormalizer.normalize(select, resolverToBe);
        
        SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolverToBe, connection);
        boolean hasPreProcessing = transformedSelect != select;
        if (transformedSelect != select) {
            resolverToBe = FromCompiler.getResolverForQuery(transformedSelect, connection, false, delete.getTable().getName());
            select = StatementNormalizer.normalize(transformedSelect, resolverToBe);
        }
        final boolean hasPreOrPostProcessing = hasPreProcessing || hasPostProcessing;
        boolean noQueryReqd = !hasPreOrPostProcessing;
        // No limit and no sub queries, joins, etc in where clause
        // Can't run on same server for transactional data, as we need the row keys for the data
        // that is being upserted for conflict detection purposes.
        // If we have immutable indexes, we'd increase the number of bytes scanned by executing
        // separate queries against each index, so better to drive from a single table in that case.
        boolean runOnServer = isAutoCommit && !hasPreOrPostProcessing && !table.isTransactional() && !hasClientSideIndexes && allowServerMutations;
        HintNode hint = delete.getHint();
        if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
            select = SelectStatement.create(select, HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE));
        }
        
        parallelIteratorFactoryToBe = hasPreOrPostProcessing ? null : new DeletingParallelIteratorFactory(connection);
        QueryOptimizer optimizer = new QueryOptimizer(services);
        QueryCompiler compiler = new QueryCompiler(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe, new SequenceManager(statement));
        final QueryPlan dataPlan = compiler.compile();
        // TODO: the select clause should know that there's a sub query, but doesn't seem to currently
        queryPlans = Lists.newArrayList(!clientSideIndexes.isEmpty()
                ? optimizer.getApplicablePlans(dataPlan, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe)
                : optimizer.getBestPlan(dataPlan, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe));

        runOnServer &= queryPlans.get(0).getTableRef().getTable().getType() != PTableType.INDEX;

        // We need to have all indexed columns available in all immutable indexes in order
        // to generate the delete markers from the query. We also cannot have any filters
        // except for our SkipScanFilter for point lookups.
        // A simple check of the non existence of a where clause in the parse node is not sufficient, as the where clause
        // may have been optimized out. Instead, we check that there's a single SkipScanFilter
        // If we can generate a plan for every index, that means all the required columns are available in every index,
        // hence we can drive the delete from any of the plans.
        noQueryReqd &= queryPlans.size() == 1 + clientSideIndexes.size();
        int queryPlanIndex = 0;
        while (noQueryReqd && queryPlanIndex < queryPlans.size()) {
            QueryPlan plan = queryPlans.get(queryPlanIndex++);
            StatementContext context = plan.getContext();
            noQueryReqd &= (!context.getScan().hasFilter()
                    || context.getScan().getFilter() instanceof SkipScanFilter)
                && context.getScanRanges().isPointLookup();
        }

        final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
        final long maxSizeBytes = services.getProps()
                .getLongBytes(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,
                        QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
 
        // If we're doing a query for a set of rows with no where clause, then we don't need to contact the server at all.
        if (noQueryReqd) {
            // Create a mutationPlan for each queryPlan. One plan will be for the deletion of the rows
            // from the data table, while the others will be for deleting rows from immutable indexes.
            List<MutationPlan> mutationPlans = Lists.newArrayListWithExpectedSize(queryPlans.size());
            for (final QueryPlan plan : queryPlans) {
                mutationPlans.add(new SingleRowDeleteMutationPlan(plan, connection, maxSize, maxSizeBytes));
            }
            return new MultiRowDeleteMutationPlan(dataPlan, mutationPlans);
        } else if (runOnServer) {
            // TODO: better abstraction
            final StatementContext context = dataPlan.getContext();
            Scan scan = context.getScan();
            scan.setAttribute(BaseScannerRegionObserverConstants.DELETE_AGG, QueryConstants.TRUE);
            if (context.getScanRanges().getPointLookupCount() == 1 &&
                    returnResult == MutationState.ReturnResult.ROW) {
                scan.setAttribute(BaseScannerRegionObserverConstants.SINGLE_ROW_DELETE,
                        QueryConstants.TRUE);
            }

            // Build an ungrouped aggregate query: select COUNT(*) from <table> where <where>
            // The coprocessor will delete each row returned from the scan
            // Ignoring ORDER BY, since with auto commit on and no limit makes no difference
            SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint());
            RowProjector projectorToBe = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY);
            context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
            if (dataPlan.getProjector().projectEveryRow()) {
                projectorToBe = new RowProjector(projectorToBe,true);
            }
            final RowProjector projector = projectorToBe;
            final QueryPlan aggPlan = new AggregatePlan(context, select, dataPlan.getTableRef(), projector, null, null,
                    OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, dataPlan);
            return new ServerSelectDeleteMutationPlan(dataPlan, connection, aggPlan, projector, maxSize, maxSizeBytes);
        } else {
            final DeletingParallelIteratorFactory parallelIteratorFactory = parallelIteratorFactoryToBe;
            List<PColumn> adjustedProjectedColumns = Lists.newArrayListWithExpectedSize(projectedColumns.size());
            final int offset = table.getBucketNum() == null ? 0 : 1;
            Iterator<PColumn> projectedColsItr = projectedColumns.iterator();
            int i = 0;
            while (projectedColsItr.hasNext()) {
                final int position = i++;
                adjustedProjectedColumns.add(new DelegateColumn(projectedColsItr.next()) {
                    @Override
                    public int getPosition() {
                        return position + offset;
                    }
                });
            }
            PTable projectedTable = PTableImpl.builderWithColumns(table, adjustedProjectedColumns)
                    .setType(PTableType.PROJECTED)
                    .build();
            final TableRef projectedTableRef = new TableRef(projectedTable, targetTableRef.getLowerBoundTimeStamp(), targetTableRef.getTimeStamp());

            QueryPlan bestPlanToBe = dataPlan;
            for (QueryPlan plan : queryPlans) {
                PTable planTable = plan.getTableRef().getTable();
                if (planTable.getIndexState() != PIndexState.BUILDING) {
                    bestPlanToBe = plan;
                    break;
                }
            }
            final QueryPlan bestPlan = bestPlanToBe;
            final List<TableRef>otherTableRefs = Lists.newArrayListWithExpectedSize(clientSideIndexes.size());
            for (PTable index : clientSideIndexes) {
                if (!bestPlan.getTableRef().getTable().equals(index)) {
                    otherTableRefs.add(new TableRef(index, targetTableRef.getLowerBoundTimeStamp(), targetTableRef.getTimeStamp()));
                }
            }
            
            if (!bestPlan.getTableRef().getTable().equals(targetTableRef.getTable())) {
                otherTableRefs.add(projectedTableRef);
            }
            return new ClientSelectDeleteMutationPlan(targetTableRef, dataPlan, bestPlan, hasPreOrPostProcessing,
                    parallelIteratorFactory, otherTableRefs, projectedTableRef, maxSize, maxSizeBytes, connection);
        }
    }