protected Optional doCreateDatasetStatement()

in asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java [880:1148]


    protected Optional<? extends Dataset> doCreateDatasetStatement(MetadataProvider metadataProvider, DatasetDecl dd,
            Namespace namespace, String datasetName, Namespace itemTypeNamespace, TypeExpression itemTypeExpr,
            String itemTypeName, TypeExpression metaItemTypeExpr, Namespace metaItemTypeNamespace,
            String metaItemTypeName, IHyracksClientConnection hcc, IRequestParameters requestParameters,
            Creator creator) throws Exception {
        DataverseName dataverseName = namespace.getDataverseName();
        String databaseName = namespace.getDatabaseName();

        DataverseName itemTypeDataverseName = null;
        String itemTypeDatabaseName = null;
        if (itemTypeNamespace != null) {
            itemTypeDataverseName = itemTypeNamespace.getDataverseName();
            itemTypeDatabaseName = itemTypeNamespace.getDatabaseName();
        }

        DataverseName metaItemTypeDataverseName = null;
        String metaItemTypeDatabaseName = null;
        if (metaItemTypeNamespace != null) {
            metaItemTypeDataverseName = metaItemTypeNamespace.getDataverseName();
            metaItemTypeDatabaseName = metaItemTypeNamespace.getDatabaseName();
        }

        MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
        SourceLocation sourceLoc = dd.getSourceLocation();
        DatasetType dsType = dd.getDatasetType();
        String ngNameId = dd.getNodegroupName();
        String compactionPolicy = dd.getCompactionPolicy();
        Map<String, String> compactionPolicyProperties = dd.getCompactionPolicyProperties();
        String compressionScheme = metadataProvider.getCompressionManager()
                .getDdlOrDefaultCompressionScheme(dd.getDatasetCompressionScheme());
        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
        boolean bActiveTxn = true;
        metadataProvider.setMetadataTxnContext(mdTxnCtx);
        Dataset dataset = null;
        Datatype itemTypeEntity = null, metaItemTypeEntity = null;
        boolean itemTypeAdded = false, metaItemTypeAdded = false;

        StorageProperties storageProperties = metadataProvider.getStorageProperties();
        DatasetFormatInfo datasetFormatInfo = dd.getDatasetFormatInfo(storageProperties.getStorageFormat(),
                storageProperties.getColumnMaxTupleCount(), storageProperties.getColumnFreeSpaceTolerance(),
                storageProperties.getColumnMaxLeafNodeSize());
        try {
            //TODO(DB): also check for database existence?

            // Check if the dataverse exists
            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, databaseName, dataverseName);
            if (dv == null) {
                throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc,
                        MetadataUtil.dataverseName(databaseName, dataverseName, metadataProvider.isUsingDatabase()));
            }

            IDatasetDetails datasetDetails;
            Dataset ds = metadataProvider.findDataset(databaseName, dataverseName, datasetName, true);
            if (ds != null) {
                if (ds.getDatasetType() == DatasetType.VIEW) {
                    throw new CompilationException(ErrorCode.VIEW_EXISTS, sourceLoc,
                            DatasetUtil.getFullyQualifiedDisplayName(ds));
                }
                if (dd.getIfNotExists()) {
                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                    return Optional.empty();
                } else {
                    throw new CompilationException(ErrorCode.DATASET_EXISTS, sourceLoc, datasetName, dataverseName);
                }
            }
            if (dd.getQuery() != null) {
                IQueryRewriter queryRewriter = rewriterFactory.createQueryRewriter();
                Query wrappedQuery = queryRewriter.createDatasetAccessorQuery(dd, namespaceResolver, namespace);
                dd.setNamespace(namespace);
                LangRewritingContext langRewritingContext =
                        createLangRewritingContext(metadataProvider, declaredFunctions, null,
                                Collections.singletonList(dd), warningCollector, wrappedQuery.getVarCounter());
                apiFramework.reWriteQuery(langRewritingContext, wrappedQuery, sessionOutput, false, false,
                        Collections.emptyList());

                LangDatasetUtil.getDatasetDependencies(metadataProvider, dd, queryRewriter);
                appCtx.getReceptionist().ensureAuthorized(requestParameters, metadataProvider);
            }

            List<TypeExpression> partitioningExprTypes = null;
            if (dsType == DatasetType.INTERNAL) {
                partitioningExprTypes = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprTypes();
            }

            Pair<Datatype, Boolean> itemTypePair = fetchDatasetItemType(mdTxnCtx, dsType, datasetFormatInfo.getFormat(),
                    partitioningExprTypes, itemTypeDatabaseName, itemTypeDataverseName, itemTypeName, itemTypeExpr,
                    false, metadataProvider, sourceLoc);
            itemTypeEntity = itemTypePair.first;
            IAType itemType = itemTypeEntity.getDatatype();
            boolean itemTypeIsInline = itemTypePair.second;

            String ngName = ngNameId != null ? ngNameId
                    : configureNodegroupForDataset(appCtx, dd.getHints(), databaseName, dataverseName, datasetName,
                            metadataProvider, sourceLoc);

            if (compactionPolicy == null) {
                compactionPolicy = StorageConstants.DEFAULT_COMPACTION_POLICY_NAME;
                compactionPolicyProperties = StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES;
            } else {
                validateCompactionPolicy(compactionPolicy, compactionPolicyProperties, mdTxnCtx, false, sourceLoc);
            }

            IAType metaItemType = null;
            boolean metaItemTypeIsInline = false;
            switch (dsType) {
                case INTERNAL:
                    if (metaItemTypeExpr != null) {
                        Pair<Datatype, Boolean> metaItemTypePair =
                                fetchDatasetItemType(mdTxnCtx, dsType, datasetFormatInfo.getFormat(),
                                        partitioningExprTypes, metaItemTypeDatabaseName, metaItemTypeDataverseName,
                                        metaItemTypeName, metaItemTypeExpr, true, metadataProvider, sourceLoc);
                        metaItemTypeEntity = metaItemTypePair.first;
                        metaItemType = metaItemTypeEntity.getDatatype();
                        metaItemTypeIsInline = metaItemTypePair.second;
                    }
                    ARecordType metaRecType = (ARecordType) metaItemType;

                    List<List<String>> partitioningExprs =
                            ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs();

                    List<Integer> keySourceIndicators =
                            ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getKeySourceIndicators();
                    boolean autogenerated = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).isAutogenerated();
                    ARecordType aRecordType = (ARecordType) itemType;
                    List<IAType> partitioningTypes = validatePartitioningExpressions(aRecordType, metaRecType,
                            partitioningExprs, keySourceIndicators, autogenerated, sourceLoc, partitioningExprTypes);

                    List<String> filterField = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getFilterField();
                    Integer filterSourceIndicator =
                            ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getFilterSourceIndicator();

                    if (filterField != null) {
                        ValidateUtil.validateFilterField(aRecordType, metaRecType, filterSourceIndicator, filterField,
                                sourceLoc);
                    }
                    if (compactionPolicy == null && filterField != null) {
                        // If the dataset has a filter and the user didn't specify a merge
                        // policy, then we will pick the
                        // correlated-prefix as the default merge policy.
                        compactionPolicy = StorageConstants.DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME;
                        compactionPolicyProperties = StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES;
                    }
                    boolean isDatasetWithoutTypeSpec = isDatasetWithoutTypeSpec(dd, aRecordType, metaRecType);
                    // Validate dataset properties if the format is COLUMN
                    ColumnPropertiesValidationUtil.validate(sourceLoc, datasetFormatInfo.getFormat(), compactionPolicy,
                            filterField);
                    datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
                            InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
                            keySourceIndicators, partitioningTypes, autogenerated, filterSourceIndicator, filterField,
                            isDatasetWithoutTypeSpec);
                    break;
                case EXTERNAL:
                    ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl();
                    Map<String, String> properties = createExternalDatasetProperties(databaseName, dataverseName, dd,
                            itemTypeEntity, metadataProvider, mdTxnCtx);
                    ExternalDataUtils.normalize(properties);
                    ExternalDataUtils.validate(properties);
                    ExternalDataUtils.validateType(properties, (ARecordType) itemType);
                    validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx,
                            appCtx, metadataProvider);
                    datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
                            TransactionState.COMMIT);
                    break;
                default:
                    throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_DATASET_TYPE,
                            dataset.getDatasetType().toString());
            }

            // #. initialize DatasetIdFactory if it is not initialized.
            if (!DatasetIdFactory.isInitialized()) {
                DatasetIdFactory.initialize(MetadataManager.INSTANCE.getMostRecentDatasetId());
            }

            // #. add a new dataset with PendingAddOp
            dataset = (Dataset) createDataset(dd, databaseName, dataverseName, datasetName, itemTypeDatabaseName,
                    itemTypeDataverseName, itemTypeName, metaItemTypeDatabaseName, metaItemTypeDataverseName,
                    metaItemTypeName, dsType, compactionPolicy, compactionPolicyProperties, compressionScheme,
                    datasetFormatInfo, datasetDetails, ngName, creator);
            MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);

            if (itemTypeIsInline) {
                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, itemTypeEntity);
                itemTypeAdded = true;
            }
            if (metaItemTypeIsInline) {
                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, metaItemTypeEntity);
                metaItemTypeAdded = true;
            }

            if (dsType == DatasetType.INTERNAL) {
                JobSpecification jobSpec = DatasetUtil.createDatasetJobSpec(dataset, metadataProvider);
                // #. make metadataTxn commit before calling runJob.
                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                bActiveTxn = false;
                progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);

                // #. runJob
                runJob(hcc, jobSpec);

                // #. begin new metadataTxn
                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                bActiveTxn = true;
                metadataProvider.setMetadataTxnContext(mdTxnCtx);
            }

            // #. add a new dataset with PendingNoOp after deleting the dataset with
            // PendingAddOp
            MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), databaseName, dataverseName,
                    datasetName, requestParameters.isForceDropDataset());
            dataset.setPendingOp(MetadataUtil.PENDING_NO_OP);
            MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
            beforeTxnCommit(metadataProvider, creator,
                    EntityDetails.newDataset(databaseName, dataverseName, datasetName));
            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
        } catch (Exception e) {
            if (bActiveTxn) {
                abort(e, e, mdTxnCtx);
            }

            if (progress.getValue() == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {

                // #. execute compensation operations
                // remove the index in NC
                // [Notice]
                // As long as we updated(and committed) metadata, we should remove any effect of
                // the job
                // because an exception occurs during runJob.
                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                bActiveTxn = true;
                metadataProvider.setMetadataTxnContext(mdTxnCtx);
                try {
                    JobSpecification jobSpec =
                            DatasetUtil.dropDatasetJobSpec(dataset, metadataProvider, EnumSet.of(DropOption.IF_EXISTS));
                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                    bActiveTxn = false;
                    runJob(hcc, jobSpec);
                } catch (Exception e2) {
                    e.addSuppressed(e2);
                    if (bActiveTxn) {
                        abort(e, e2, mdTxnCtx);
                    }
                }

                // remove the record from the metadata.
                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                metadataProvider.setMetadataTxnContext(mdTxnCtx);
                try {
                    MetadataManager.INSTANCE.dropDataset(mdTxnCtx, databaseName, dataverseName, datasetName,
                            requestParameters.isForceDropDataset());
                    if (itemTypeAdded) {
                        MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, itemTypeEntity.getDatabaseName(),
                                itemTypeEntity.getDataverseName(), itemTypeEntity.getDatatypeName());
                    }
                    if (metaItemTypeAdded) {
                        MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, metaItemTypeEntity.getDatabaseName(),
                                metaItemTypeEntity.getDataverseName(), metaItemTypeEntity.getDatatypeName());
                    }
                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                } catch (Exception e2) {
                    e.addSuppressed(e2);
                    abort(e, e2, mdTxnCtx);
                    throw new IllegalStateException("System is inconsistent state: pending dataset(" + dataverseName
                            + "." + datasetName + ") couldn't be removed from the metadata", e);
                }
            }
            throw e;
        }
        return Optional.of(dataset);
    }