in asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java [880:1148]
protected Optional<? extends Dataset> doCreateDatasetStatement(MetadataProvider metadataProvider, DatasetDecl dd,
Namespace namespace, String datasetName, Namespace itemTypeNamespace, TypeExpression itemTypeExpr,
String itemTypeName, TypeExpression metaItemTypeExpr, Namespace metaItemTypeNamespace,
String metaItemTypeName, IHyracksClientConnection hcc, IRequestParameters requestParameters,
Creator creator) throws Exception {
DataverseName dataverseName = namespace.getDataverseName();
String databaseName = namespace.getDatabaseName();
DataverseName itemTypeDataverseName = null;
String itemTypeDatabaseName = null;
if (itemTypeNamespace != null) {
itemTypeDataverseName = itemTypeNamespace.getDataverseName();
itemTypeDatabaseName = itemTypeNamespace.getDatabaseName();
}
DataverseName metaItemTypeDataverseName = null;
String metaItemTypeDatabaseName = null;
if (metaItemTypeNamespace != null) {
metaItemTypeDataverseName = metaItemTypeNamespace.getDataverseName();
metaItemTypeDatabaseName = metaItemTypeNamespace.getDatabaseName();
}
MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
SourceLocation sourceLoc = dd.getSourceLocation();
DatasetType dsType = dd.getDatasetType();
String ngNameId = dd.getNodegroupName();
String compactionPolicy = dd.getCompactionPolicy();
Map<String, String> compactionPolicyProperties = dd.getCompactionPolicyProperties();
String compressionScheme = metadataProvider.getCompressionManager()
.getDdlOrDefaultCompressionScheme(dd.getDatasetCompressionScheme());
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
Dataset dataset = null;
Datatype itemTypeEntity = null, metaItemTypeEntity = null;
boolean itemTypeAdded = false, metaItemTypeAdded = false;
StorageProperties storageProperties = metadataProvider.getStorageProperties();
DatasetFormatInfo datasetFormatInfo = dd.getDatasetFormatInfo(storageProperties.getStorageFormat(),
storageProperties.getColumnMaxTupleCount(), storageProperties.getColumnFreeSpaceTolerance(),
storageProperties.getColumnMaxLeafNodeSize());
try {
//TODO(DB): also check for database existence?
// Check if the dataverse exists
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, databaseName, dataverseName);
if (dv == null) {
throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc,
MetadataUtil.dataverseName(databaseName, dataverseName, metadataProvider.isUsingDatabase()));
}
IDatasetDetails datasetDetails;
Dataset ds = metadataProvider.findDataset(databaseName, dataverseName, datasetName, true);
if (ds != null) {
if (ds.getDatasetType() == DatasetType.VIEW) {
throw new CompilationException(ErrorCode.VIEW_EXISTS, sourceLoc,
DatasetUtil.getFullyQualifiedDisplayName(ds));
}
if (dd.getIfNotExists()) {
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
return Optional.empty();
} else {
throw new CompilationException(ErrorCode.DATASET_EXISTS, sourceLoc, datasetName, dataverseName);
}
}
if (dd.getQuery() != null) {
IQueryRewriter queryRewriter = rewriterFactory.createQueryRewriter();
Query wrappedQuery = queryRewriter.createDatasetAccessorQuery(dd, namespaceResolver, namespace);
dd.setNamespace(namespace);
LangRewritingContext langRewritingContext =
createLangRewritingContext(metadataProvider, declaredFunctions, null,
Collections.singletonList(dd), warningCollector, wrappedQuery.getVarCounter());
apiFramework.reWriteQuery(langRewritingContext, wrappedQuery, sessionOutput, false, false,
Collections.emptyList());
LangDatasetUtil.getDatasetDependencies(metadataProvider, dd, queryRewriter);
appCtx.getReceptionist().ensureAuthorized(requestParameters, metadataProvider);
}
List<TypeExpression> partitioningExprTypes = null;
if (dsType == DatasetType.INTERNAL) {
partitioningExprTypes = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprTypes();
}
Pair<Datatype, Boolean> itemTypePair = fetchDatasetItemType(mdTxnCtx, dsType, datasetFormatInfo.getFormat(),
partitioningExprTypes, itemTypeDatabaseName, itemTypeDataverseName, itemTypeName, itemTypeExpr,
false, metadataProvider, sourceLoc);
itemTypeEntity = itemTypePair.first;
IAType itemType = itemTypeEntity.getDatatype();
boolean itemTypeIsInline = itemTypePair.second;
String ngName = ngNameId != null ? ngNameId
: configureNodegroupForDataset(appCtx, dd.getHints(), databaseName, dataverseName, datasetName,
metadataProvider, sourceLoc);
if (compactionPolicy == null) {
compactionPolicy = StorageConstants.DEFAULT_COMPACTION_POLICY_NAME;
compactionPolicyProperties = StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES;
} else {
validateCompactionPolicy(compactionPolicy, compactionPolicyProperties, mdTxnCtx, false, sourceLoc);
}
IAType metaItemType = null;
boolean metaItemTypeIsInline = false;
switch (dsType) {
case INTERNAL:
if (metaItemTypeExpr != null) {
Pair<Datatype, Boolean> metaItemTypePair =
fetchDatasetItemType(mdTxnCtx, dsType, datasetFormatInfo.getFormat(),
partitioningExprTypes, metaItemTypeDatabaseName, metaItemTypeDataverseName,
metaItemTypeName, metaItemTypeExpr, true, metadataProvider, sourceLoc);
metaItemTypeEntity = metaItemTypePair.first;
metaItemType = metaItemTypeEntity.getDatatype();
metaItemTypeIsInline = metaItemTypePair.second;
}
ARecordType metaRecType = (ARecordType) metaItemType;
List<List<String>> partitioningExprs =
((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs();
List<Integer> keySourceIndicators =
((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getKeySourceIndicators();
boolean autogenerated = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).isAutogenerated();
ARecordType aRecordType = (ARecordType) itemType;
List<IAType> partitioningTypes = validatePartitioningExpressions(aRecordType, metaRecType,
partitioningExprs, keySourceIndicators, autogenerated, sourceLoc, partitioningExprTypes);
List<String> filterField = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getFilterField();
Integer filterSourceIndicator =
((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getFilterSourceIndicator();
if (filterField != null) {
ValidateUtil.validateFilterField(aRecordType, metaRecType, filterSourceIndicator, filterField,
sourceLoc);
}
if (compactionPolicy == null && filterField != null) {
// If the dataset has a filter and the user didn't specify a merge
// policy, then we will pick the
// correlated-prefix as the default merge policy.
compactionPolicy = StorageConstants.DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME;
compactionPolicyProperties = StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES;
}
boolean isDatasetWithoutTypeSpec = isDatasetWithoutTypeSpec(dd, aRecordType, metaRecType);
// Validate dataset properties if the format is COLUMN
ColumnPropertiesValidationUtil.validate(sourceLoc, datasetFormatInfo.getFormat(), compactionPolicy,
filterField);
datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
keySourceIndicators, partitioningTypes, autogenerated, filterSourceIndicator, filterField,
isDatasetWithoutTypeSpec);
break;
case EXTERNAL:
ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl();
Map<String, String> properties = createExternalDatasetProperties(databaseName, dataverseName, dd,
itemTypeEntity, metadataProvider, mdTxnCtx);
ExternalDataUtils.normalize(properties);
ExternalDataUtils.validate(properties);
ExternalDataUtils.validateType(properties, (ARecordType) itemType);
validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx,
appCtx, metadataProvider);
datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
TransactionState.COMMIT);
break;
default:
throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_DATASET_TYPE,
dataset.getDatasetType().toString());
}
// #. initialize DatasetIdFactory if it is not initialized.
if (!DatasetIdFactory.isInitialized()) {
DatasetIdFactory.initialize(MetadataManager.INSTANCE.getMostRecentDatasetId());
}
// #. add a new dataset with PendingAddOp
dataset = (Dataset) createDataset(dd, databaseName, dataverseName, datasetName, itemTypeDatabaseName,
itemTypeDataverseName, itemTypeName, metaItemTypeDatabaseName, metaItemTypeDataverseName,
metaItemTypeName, dsType, compactionPolicy, compactionPolicyProperties, compressionScheme,
datasetFormatInfo, datasetDetails, ngName, creator);
MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
if (itemTypeIsInline) {
MetadataManager.INSTANCE.addDatatype(mdTxnCtx, itemTypeEntity);
itemTypeAdded = true;
}
if (metaItemTypeIsInline) {
MetadataManager.INSTANCE.addDatatype(mdTxnCtx, metaItemTypeEntity);
metaItemTypeAdded = true;
}
if (dsType == DatasetType.INTERNAL) {
JobSpecification jobSpec = DatasetUtil.createDatasetJobSpec(dataset, metadataProvider);
// #. make metadataTxn commit before calling runJob.
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
// #. runJob
runJob(hcc, jobSpec);
// #. begin new metadataTxn
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
}
// #. add a new dataset with PendingNoOp after deleting the dataset with
// PendingAddOp
MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), databaseName, dataverseName,
datasetName, requestParameters.isForceDropDataset());
dataset.setPendingOp(MetadataUtil.PENDING_NO_OP);
MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
beforeTxnCommit(metadataProvider, creator,
EntityDetails.newDataset(databaseName, dataverseName, datasetName));
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
if (bActiveTxn) {
abort(e, e, mdTxnCtx);
}
if (progress.getValue() == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
// #. execute compensation operations
// remove the index in NC
// [Notice]
// As long as we updated(and committed) metadata, we should remove any effect of
// the job
// because an exception occurs during runJob.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
JobSpecification jobSpec =
DatasetUtil.dropDatasetJobSpec(dataset, metadataProvider, EnumSet.of(DropOption.IF_EXISTS));
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
runJob(hcc, jobSpec);
} catch (Exception e2) {
e.addSuppressed(e2);
if (bActiveTxn) {
abort(e, e2, mdTxnCtx);
}
}
// remove the record from the metadata.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
MetadataManager.INSTANCE.dropDataset(mdTxnCtx, databaseName, dataverseName, datasetName,
requestParameters.isForceDropDataset());
if (itemTypeAdded) {
MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, itemTypeEntity.getDatabaseName(),
itemTypeEntity.getDataverseName(), itemTypeEntity.getDatatypeName());
}
if (metaItemTypeAdded) {
MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, metaItemTypeEntity.getDatabaseName(),
metaItemTypeEntity.getDataverseName(), metaItemTypeEntity.getDatatypeName());
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e2) {
e.addSuppressed(e2);
abort(e, e2, mdTxnCtx);
throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName
+ "." + datasetName + ") couldn't be removed from the metadata", e);
}
}
throw e;
}
return Optional.of(dataset);
}