in asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java [4959:5163]
protected void doAnalyzeDataset(MetadataProvider metadataProvider, AnalyzeStatement stmtAnalyze,
String databaseName, DataverseName dataverseName, String datasetName, IHyracksClientConnection hcc,
IRequestParameters requestParameters) throws Exception {
SourceLocation sourceLoc = stmtAnalyze.getSourceLocation();
ProgressState progressNewIndexCreate = ProgressState.NO_PROGRESS;
ProgressState progressExistingIndexDrop = ProgressState.NO_PROGRESS;
Dataset ds = null;
Index existingIndex = null, newIndexPendingAdd = null;
JobSpecification existingIndexDropSpec = null;
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
// Check if the dataverse exists
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, databaseName, dataverseName);
if (dv == null) {
throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc,
MetadataUtil.dataverseName(databaseName, dataverseName, metadataProvider.isUsingDatabase()));
}
// Check if the dataset exists
ds = metadataProvider.findDataset(databaseName, dataverseName, datasetName);
if (ds == null) {
throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName,
MetadataUtil.dataverseName(databaseName, dataverseName, metadataProvider.isUsingDatabase()));
}
if (ds.getDatasetType() == DatasetType.INTERNAL) {
validateDatasetState(metadataProvider, ds, sourceLoc);
} else {
throw new CompilationException(ErrorCode.OPERATION_NOT_SUPPORTED, sourceLoc);
}
IndexType sampleIndexType = IndexType.SAMPLE;
Pair<String, String> sampleIndexNames = IndexUtil.getSampleIndexNames(datasetName);
String newIndexName;
existingIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), databaseName,
dataverseName, datasetName, sampleIndexNames.first);
if (existingIndex != null) {
newIndexName = sampleIndexNames.second;
} else {
existingIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
databaseName, dataverseName, datasetName, sampleIndexNames.second);
newIndexName = sampleIndexNames.first;
}
InternalDatasetDetails dsDetails = (InternalDatasetDetails) ds.getDatasetDetails();
int sampleCardinalityTarget = stmtAnalyze.getSampleSize();
long sampleSeed = stmtAnalyze.getOrCreateSampleSeed();
Index.SampleIndexDetails newIndexDetailsPendingAdd = new Index.SampleIndexDetails(dsDetails.getPrimaryKey(),
dsDetails.getKeySourceIndicator(), dsDetails.getPrimaryKeyType(), sampleCardinalityTarget, 0, 0,
sampleSeed, Collections.emptyMap());
newIndexPendingAdd = new Index(databaseName, dataverseName, datasetName, newIndexName, sampleIndexType,
newIndexDetailsPendingAdd, false, false, MetadataUtil.PENDING_ADD_OP, Creator.DEFAULT_CREATOR);
// #. add a new index with PendingAddOp
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), newIndexPendingAdd);
// #. prepare to create the index artifact in NC.
JobSpecification spec =
IndexUtil.buildSecondaryIndexCreationJobSpec(ds, newIndexPendingAdd, metadataProvider, sourceLoc);
if (spec == null) {
throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
"Failed to create job spec for creating index '" + ds.getDatasetName() + "."
+ newIndexPendingAdd.getIndexName() + "'");
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
progressNewIndexCreate = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
// #. create the index artifact in NC.
runJob(hcc, spec);
// #. flush dataset
FlushDatasetUtil.flushDataset(hcc, metadataProvider, databaseName, dataverseName, datasetName);
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
// #. load data into the index in NC.
spec = IndexUtil.buildSecondaryIndexLoadingJobSpec(ds, newIndexPendingAdd, metadataProvider, sourceLoc);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
List<IOperatorStats> opStats = runJob(hcc, spec, jobFlags,
Collections.singletonList(SampleOperationsHelper.DATASET_STATS_OPERATOR_NAME));
if (opStats == null || opStats.size() == 0) {
throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, "", sourceLoc);
}
DatasetStreamStats stats = new DatasetStreamStats(opStats.get(0));
Index.SampleIndexDetails newIndexDetailsFinal = new Index.SampleIndexDetails(dsDetails.getPrimaryKey(),
dsDetails.getKeySourceIndicator(), dsDetails.getPrimaryKeyType(), sampleCardinalityTarget,
stats.getCardinality(), stats.getAvgTupleSize(), sampleSeed, stats.getIndexesStats());
Index newIndexFinal = new Index(databaseName, dataverseName, datasetName, newIndexName, sampleIndexType,
newIndexDetailsFinal, false, false, MetadataUtil.PENDING_NO_OP, Creator.DEFAULT_CREATOR);
// #. begin new metadataTxn
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
// #. add same new index with PendingNoOp after deleting its entry with PendingAddOp
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
newIndexPendingAdd.getDatabaseName(), newIndexPendingAdd.getDataverseName(),
newIndexPendingAdd.getDatasetName(), newIndexPendingAdd.getIndexName());
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), newIndexFinal);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
progressNewIndexCreate = ProgressState.NO_PROGRESS;
if (existingIndex != null) {
// #. set existing index to PendingDropOp because we'll be dropping it next
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
existingIndex.getDatabaseName(), existingIndex.getDataverseName(),
existingIndex.getDatasetName(), existingIndex.getIndexName());
existingIndex.setPendingOp(MetadataUtil.PENDING_DROP_OP);
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), existingIndex);
existingIndexDropSpec = IndexUtil.buildDropIndexJobSpec(existingIndex, metadataProvider, ds, sourceLoc);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
progressExistingIndexDrop = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
bActiveTxn = false;
// #. drop existing index on NCs
runJob(hcc, existingIndexDropSpec);
// #. drop existing index metadata
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
existingIndex.getDatabaseName(), existingIndex.getDataverseName(),
existingIndex.getDatasetName(), existingIndex.getIndexName());
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
progressExistingIndexDrop = ProgressState.NO_PROGRESS;
}
} catch (Exception e) {
LOGGER.error("failed to analyze dataset; executing compensating operations", e);
if (bActiveTxn) {
abort(e, e, mdTxnCtx);
}
if (progressExistingIndexDrop == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
// #. execute compensation operations remove the index in NCs
try {
runJob(hcc, existingIndexDropSpec);
} catch (Exception e2) {
// do no throw exception since still the metadata needs to be compensated.
e.addSuppressed(e2);
}
// #. remove the record from the metadata.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
existingIndex.getDatabaseName(), existingIndex.getDataverseName(),
existingIndex.getDatasetName(), existingIndex.getIndexName());
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e2) {
e.addSuppressed(e2);
abort(e, e2, mdTxnCtx);
throw new IllegalStateException("System is inconsistent state: pending index("
+ existingIndex.getDataverseName() + "." + existingIndex.getDatasetName() + "."
+ existingIndex.getIndexName() + ") couldn't be removed from the metadata", e);
}
} else if (progressNewIndexCreate == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
// #. execute compensation operations remove the index in NCs
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
JobSpecification jobSpec =
IndexUtil.buildDropIndexJobSpec(newIndexPendingAdd, metadataProvider, ds, sourceLoc);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
runJob(hcc, jobSpec);
} catch (Exception e2) {
e.addSuppressed(e2);
if (bActiveTxn) {
abort(e, e2, mdTxnCtx);
}
}
// #. remove the record from the metadata.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
newIndexPendingAdd.getDatabaseName(), newIndexPendingAdd.getDataverseName(),
newIndexPendingAdd.getDatasetName(), newIndexPendingAdd.getIndexName());
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e2) {
e.addSuppressed(e2);
abort(e, e2, mdTxnCtx);
throw new IllegalStateException("System is in inconsistent state: pending index("
+ newIndexPendingAdd.getDataverseName() + "." + newIndexPendingAdd.getDatasetName() + "."
+ newIndexPendingAdd.getIndexName() + ") couldn't be removed from the metadata", e);
}
}
throw e;
}
}