protected void doAnalyzeDataset()

in asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java [4959:5163]


    protected void doAnalyzeDataset(MetadataProvider metadataProvider, AnalyzeStatement stmtAnalyze,
            String databaseName, DataverseName dataverseName, String datasetName, IHyracksClientConnection hcc,
            IRequestParameters requestParameters) throws Exception {
        SourceLocation sourceLoc = stmtAnalyze.getSourceLocation();
        ProgressState progressNewIndexCreate = ProgressState.NO_PROGRESS;
        ProgressState progressExistingIndexDrop = ProgressState.NO_PROGRESS;
        Dataset ds = null;
        Index existingIndex = null, newIndexPendingAdd = null;
        JobSpecification existingIndexDropSpec = null;
        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
        boolean bActiveTxn = true;
        metadataProvider.setMetadataTxnContext(mdTxnCtx);
        try {
            // Check if the dataverse exists
            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, databaseName, dataverseName);
            if (dv == null) {
                throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc,
                        MetadataUtil.dataverseName(databaseName, dataverseName, metadataProvider.isUsingDatabase()));
            }
            // Check if the dataset exists
            ds = metadataProvider.findDataset(databaseName, dataverseName, datasetName);
            if (ds == null) {
                throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName,
                        MetadataUtil.dataverseName(databaseName, dataverseName, metadataProvider.isUsingDatabase()));
            }
            if (ds.getDatasetType() == DatasetType.INTERNAL) {
                validateDatasetState(metadataProvider, ds, sourceLoc);
            } else {
                throw new CompilationException(ErrorCode.OPERATION_NOT_SUPPORTED, sourceLoc);
            }

            IndexType sampleIndexType = IndexType.SAMPLE;
            Pair<String, String> sampleIndexNames = IndexUtil.getSampleIndexNames(datasetName);
            String newIndexName;
            existingIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), databaseName,
                    dataverseName, datasetName, sampleIndexNames.first);
            if (existingIndex != null) {
                newIndexName = sampleIndexNames.second;
            } else {
                existingIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
                        databaseName, dataverseName, datasetName, sampleIndexNames.second);
                newIndexName = sampleIndexNames.first;
            }

            InternalDatasetDetails dsDetails = (InternalDatasetDetails) ds.getDatasetDetails();
            int sampleCardinalityTarget = stmtAnalyze.getSampleSize();
            long sampleSeed = stmtAnalyze.getOrCreateSampleSeed();

            Index.SampleIndexDetails newIndexDetailsPendingAdd = new Index.SampleIndexDetails(dsDetails.getPrimaryKey(),
                    dsDetails.getKeySourceIndicator(), dsDetails.getPrimaryKeyType(), sampleCardinalityTarget, 0, 0,
                    sampleSeed, Collections.emptyMap());
            newIndexPendingAdd = new Index(databaseName, dataverseName, datasetName, newIndexName, sampleIndexType,
                    newIndexDetailsPendingAdd, false, false, MetadataUtil.PENDING_ADD_OP, Creator.DEFAULT_CREATOR);

            // #. add a new index with PendingAddOp
            MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), newIndexPendingAdd);
            // #. prepare to create the index artifact in NC.
            JobSpecification spec =
                    IndexUtil.buildSecondaryIndexCreationJobSpec(ds, newIndexPendingAdd, metadataProvider, sourceLoc);
            if (spec == null) {
                throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
                        "Failed to create job spec for creating index '" + ds.getDatasetName() + "."
                                + newIndexPendingAdd.getIndexName() + "'");
            }
            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
            bActiveTxn = false;
            progressNewIndexCreate = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;

            // #. create the index artifact in NC.
            runJob(hcc, spec);

            // #. flush dataset
            FlushDatasetUtil.flushDataset(hcc, metadataProvider, databaseName, dataverseName, datasetName);

            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
            bActiveTxn = true;
            metadataProvider.setMetadataTxnContext(mdTxnCtx);

            // #. load data into the index in NC.
            spec = IndexUtil.buildSecondaryIndexLoadingJobSpec(ds, newIndexPendingAdd, metadataProvider, sourceLoc);
            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
            bActiveTxn = false;

            List<IOperatorStats> opStats = runJob(hcc, spec, jobFlags,
                    Collections.singletonList(SampleOperationsHelper.DATASET_STATS_OPERATOR_NAME));
            if (opStats == null || opStats.size() == 0) {
                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, "", sourceLoc);
            }
            DatasetStreamStats stats = new DatasetStreamStats(opStats.get(0));

            Index.SampleIndexDetails newIndexDetailsFinal = new Index.SampleIndexDetails(dsDetails.getPrimaryKey(),
                    dsDetails.getKeySourceIndicator(), dsDetails.getPrimaryKeyType(), sampleCardinalityTarget,
                    stats.getCardinality(), stats.getAvgTupleSize(), sampleSeed, stats.getIndexesStats());
            Index newIndexFinal = new Index(databaseName, dataverseName, datasetName, newIndexName, sampleIndexType,
                    newIndexDetailsFinal, false, false, MetadataUtil.PENDING_NO_OP, Creator.DEFAULT_CREATOR);

            // #. begin new metadataTxn
            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
            bActiveTxn = true;
            metadataProvider.setMetadataTxnContext(mdTxnCtx);
            // #. add same new index with PendingNoOp after deleting its entry with PendingAddOp
            MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
                    newIndexPendingAdd.getDatabaseName(), newIndexPendingAdd.getDataverseName(),
                    newIndexPendingAdd.getDatasetName(), newIndexPendingAdd.getIndexName());
            MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), newIndexFinal);
            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
            bActiveTxn = false;
            progressNewIndexCreate = ProgressState.NO_PROGRESS;

            if (existingIndex != null) {
                // #. set existing index to PendingDropOp because we'll be dropping it next
                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                bActiveTxn = true;
                metadataProvider.setMetadataTxnContext(mdTxnCtx);
                MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
                        existingIndex.getDatabaseName(), existingIndex.getDataverseName(),
                        existingIndex.getDatasetName(), existingIndex.getIndexName());
                existingIndex.setPendingOp(MetadataUtil.PENDING_DROP_OP);
                MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), existingIndex);
                existingIndexDropSpec = IndexUtil.buildDropIndexJobSpec(existingIndex, metadataProvider, ds, sourceLoc);
                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                progressExistingIndexDrop = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
                bActiveTxn = false;

                // #. drop existing index on NCs
                runJob(hcc, existingIndexDropSpec);

                // #. drop existing index metadata
                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                bActiveTxn = true;
                metadataProvider.setMetadataTxnContext(mdTxnCtx);
                MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
                        existingIndex.getDatabaseName(), existingIndex.getDataverseName(),
                        existingIndex.getDatasetName(), existingIndex.getIndexName());
                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                bActiveTxn = false;
                progressExistingIndexDrop = ProgressState.NO_PROGRESS;
            }

        } catch (Exception e) {
            LOGGER.error("failed to analyze dataset; executing compensating operations", e);
            if (bActiveTxn) {
                abort(e, e, mdTxnCtx);
            }

            if (progressExistingIndexDrop == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
                // #. execute compensation operations remove the index in NCs
                try {
                    runJob(hcc, existingIndexDropSpec);
                } catch (Exception e2) {
                    // do no throw exception since still the metadata needs to be compensated.
                    e.addSuppressed(e2);
                }
                // #. remove the record from the metadata.
                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                metadataProvider.setMetadataTxnContext(mdTxnCtx);
                try {
                    MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
                            existingIndex.getDatabaseName(), existingIndex.getDataverseName(),
                            existingIndex.getDatasetName(), existingIndex.getIndexName());
                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                } catch (Exception e2) {
                    e.addSuppressed(e2);
                    abort(e, e2, mdTxnCtx);
                    throw new IllegalStateException("System is inconsistent state: pending index("
                            + existingIndex.getDataverseName() + "." + existingIndex.getDatasetName() + "."
                            + existingIndex.getIndexName() + ") couldn't be removed from the metadata", e);
                }
            } else if (progressNewIndexCreate == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
                // #. execute compensation operations remove the index in NCs
                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                bActiveTxn = true;
                metadataProvider.setMetadataTxnContext(mdTxnCtx);
                try {
                    JobSpecification jobSpec =
                            IndexUtil.buildDropIndexJobSpec(newIndexPendingAdd, metadataProvider, ds, sourceLoc);
                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                    bActiveTxn = false;
                    runJob(hcc, jobSpec);
                } catch (Exception e2) {
                    e.addSuppressed(e2);
                    if (bActiveTxn) {
                        abort(e, e2, mdTxnCtx);
                    }
                }
                // #. remove the record from the metadata.
                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                metadataProvider.setMetadataTxnContext(mdTxnCtx);
                try {
                    MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
                            newIndexPendingAdd.getDatabaseName(), newIndexPendingAdd.getDataverseName(),
                            newIndexPendingAdd.getDatasetName(), newIndexPendingAdd.getIndexName());
                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                } catch (Exception e2) {
                    e.addSuppressed(e2);
                    abort(e, e2, mdTxnCtx);
                    throw new IllegalStateException("System is in inconsistent state: pending index("
                            + newIndexPendingAdd.getDataverseName() + "." + newIndexPendingAdd.getDatasetName() + "."
                            + newIndexPendingAdd.getIndexName() + ") couldn't be removed from the metadata", e);
                }
            }

            throw e;
        }
    }