in asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java [116:689]
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op0 = (AbstractLogicalOperator) opRef.getValue();
if (op0.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR
&& op0.getOperatorTag() != LogicalOperatorTag.SINK) {
return false;
}
if (op0.getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
DelegateOperator eOp = (DelegateOperator) op0;
if (!(eOp.getDelegate() instanceof CommitOperator)) {
return false;
}
}
AbstractLogicalOperator op1 = (AbstractLogicalOperator) op0.getInputs().get(0).getValue();
if (op1.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
return false;
}
/** find the record variable */
InsertDeleteUpsertOperator primaryIndexModificationOp =
(InsertDeleteUpsertOperator) op0.getInputs().get(0).getValue();
boolean isBulkload = primaryIndexModificationOp.isBulkload();
ILogicalExpression newRecordExpr = primaryIndexModificationOp.getPayloadExpression().getValue();
List<Mutable<ILogicalExpression>> newMetaExprs =
primaryIndexModificationOp.getAdditionalNonFilteringExpressions();
LogicalVariable newRecordVar;
LogicalVariable newMetaVar = null;
sourceLoc = primaryIndexModificationOp.getSourceLocation();
this.context = context;
/**
* inputOp is the assign operator which extracts primary keys from the input
* variables (record or meta)
*/
AbstractLogicalOperator inputOp =
(AbstractLogicalOperator) primaryIndexModificationOp.getInputs().get(0).getValue();
newRecordVar = getRecordVar(inputOp, newRecordExpr, 0);
if (newMetaExprs != null && !newMetaExprs.isEmpty()) {
if (newMetaExprs.size() > 1) {
throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
"Number of meta records can't be more than 1. Number of meta records found = "
+ newMetaExprs.size());
}
newMetaVar = getRecordVar(inputOp, newMetaExprs.get(0).getValue(), 1);
}
/*
* At this point, we have the record variable and the insert/delete/upsert operator
* Note: We have two operators:
* 1. An InsertDeleteOperator (primary)
* 2. An IndexInsertDeleteOperator (secondary)
* The current primaryIndexModificationOp is of the first type
*/
DataSource datasetSource = (DataSource) primaryIndexModificationOp.getDataSource();
MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
DataverseName dataverseName = datasetSource.getId().getDataverseName();
String database = datasetSource.getId().getDatabaseName();
String datasetName = datasetSource.getId().getDatasourceName();
Dataset dataset = mp.findDataset(database, dataverseName, datasetName);
if (dataset == null) {
throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName,
MetadataUtil.dataverseName(database, dataverseName, mp.isUsingDatabase()));
}
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
return false;
}
// Create operators for secondary index insert / delete.
String itemTypeName = dataset.getItemTypeName();
IAType itemType =
mp.findType(dataset.getItemTypeDatabaseName(), dataset.getItemTypeDataverseName(), itemTypeName);
if (itemType.getTypeTag() != ATypeTag.OBJECT) {
throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "Only record types can be indexed.");
}
ARecordType recType = (ARecordType) itemType;
// meta type
ARecordType metaType = null;
if (dataset.hasMetaPart()) {
metaType = (ARecordType) mp.findType(dataset.getMetaItemTypeDatabaseName(),
dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
}
recType = (ARecordType) mp.findTypeForDatasetWithoutType(recType, metaType, dataset);
List<Index> indexes =
mp.getDatasetIndexes(dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName());
Stream<Index> indexStream = indexes.stream();
indexStream = indexStream.filter(index -> index.getIndexType() != IndexType.SAMPLE);
if (primaryIndexModificationOp.getOperation() == Kind.INSERT && !primaryIndexModificationOp.isBulkload()) {
// for insert, primary key index is handled together when primary index
indexStream = indexStream.filter(index -> !index.isPrimaryKeyIndex());
}
indexes = indexStream.collect(Collectors.toList());
// Put an n-gram or a keyword index in the later stage of index-update,
// since TokenizeOperator needs to be involved.
Collections.sort(indexes, (o1, o2) -> o1.getIndexType().ordinal() - o2.getIndexType().ordinal());
// Set the top operator pointer to the primary IndexInsertDeleteOperator
ILogicalOperator currentTop = primaryIndexModificationOp;
// At this point, we have the data type info, and the indexes info as well
int secondaryIndexTotalCnt = indexes.size() - 1;
if (secondaryIndexTotalCnt > 0) {
op0.getInputs().clear();
} else {
return false;
}
// Initialize inputs to the SINK operator Op0 (The SINK) is now without input
// Prepare filtering field information (This is the filter created using the "filter with" key word in the
// create dataset ddl)
List<String> filteringFields = ((InternalDatasetDetails) dataset.getDatasetDetails()).getFilterField();
List<LogicalVariable> filteringVars;
List<Mutable<ILogicalExpression>> filteringExpressions = null;
if (filteringFields != null) {
// The filter field var already exists. we can simply get it from the insert op
filteringVars = new ArrayList<>();
filteringExpressions = new ArrayList<>();
for (Mutable<ILogicalExpression> filteringExpression : primaryIndexModificationOp
.getAdditionalFilteringExpressions()) {
filteringExpression.getValue().getUsedVariables(filteringVars);
for (LogicalVariable var : filteringVars) {
VariableReferenceExpression varRef = new VariableReferenceExpression(var);
varRef.setSourceLocation(filteringExpression.getValue().getSourceLocation());
filteringExpressions.add(new MutableObject<ILogicalExpression>(varRef));
}
}
}
// Replicate Operator is applied only when doing the bulk-load.
ReplicateOperator replicateOp = null;
if (secondaryIndexTotalCnt > 1 && isBulkload) {
// Split the logical plan into "each secondary index update branch"
// to replicate each <PK,OBJECT> pair.
replicateOp = new ReplicateOperator(secondaryIndexTotalCnt);
replicateOp.setSourceLocation(sourceLoc);
replicateOp.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
replicateOp.setExecutionMode(ExecutionMode.PARTITIONED);
context.computeAndSetTypeEnvironmentForOperator(replicateOp);
currentTop = replicateOp;
}
/*
* The two maps are used to store variables to which [casted] field access is assigned.
* One for the beforeOp record and the other for the new record.
* There are two uses for these maps:
* 1. used for shared fields in indexes with overlapping keys.
* 2. used for setting variables of secondary keys for each secondary index operator.
*/
Map<IndexFieldId, LogicalVariable> fieldVarsForBeforeOperation = new HashMap<>();
Map<IndexFieldId, LogicalVariable> fieldVarsForNewRecord = new HashMap<>();
/*
* if the index is enforcing field types (For open indexes), We add a cast
* operator to ensure type safety
*/
if (primaryIndexModificationOp.getOperation() == Kind.INSERT
|| primaryIndexModificationOp.getOperation() == Kind.UPSERT
/* Actually, delete should not be here but it is now until issue
* https://issues.apache.org/jira/browse/ASTERIXDB-1507
* is solved
*/
|| primaryIndexModificationOp.getOperation() == Kind.DELETE) {
injectFieldAccessesForIndexes(dataset, indexes, fieldVarsForNewRecord, recType, metaType, newRecordVar,
newMetaVar, primaryIndexModificationOp, false);
if (replicateOp != null) {
context.computeAndSetTypeEnvironmentForOperator(replicateOp);
}
}
if (primaryIndexModificationOp.getOperation() == Kind.UPSERT
/* Actually, delete should be here but it is not until issue
* https://issues.apache.org/jira/browse/ASTERIXDB-1507
* is solved
*/) {
List<LogicalVariable> beforeOpMetaVars = primaryIndexModificationOp.getBeforeOpAdditionalNonFilteringVars();
LogicalVariable beforeOpMetaVar = beforeOpMetaVars == null ? null : beforeOpMetaVars.get(0);
currentTop = injectFieldAccessesForIndexes(dataset, indexes, fieldVarsForBeforeOperation, recType, metaType,
primaryIndexModificationOp.getBeforeOpRecordVar(), beforeOpMetaVar, currentTop, true);
}
// Add the appropriate SIDX maintenance operations.
for (Index index : indexes) {
if (!index.isSecondaryIndex()) {
continue;
}
// Get the secondary fields names and types
List<List<String>> secondaryKeyFields = null;
List<IAType> secondaryKeyTypes = null;
List<Integer> secondaryKeySources = null;
switch (Index.IndexCategory.of(index.getIndexType())) {
case VALUE:
Index.ValueIndexDetails valueIndexDetails = (Index.ValueIndexDetails) index.getIndexDetails();
secondaryKeyFields = valueIndexDetails.getKeyFieldNames();
secondaryKeyTypes = valueIndexDetails.getKeyFieldTypes();
secondaryKeySources = valueIndexDetails.getKeyFieldSourceIndicators();
break;
case TEXT:
Index.TextIndexDetails textIndexDetails = (Index.TextIndexDetails) index.getIndexDetails();
secondaryKeyFields = textIndexDetails.getKeyFieldNames();
secondaryKeyTypes = textIndexDetails.getKeyFieldTypes();
secondaryKeySources = textIndexDetails.getKeyFieldSourceIndicators();
break;
case ARRAY:
// These details are handled separately for array indexes.
break;
default:
continue;
}
// Set our key variables and expressions for non-array indexes. Our secondary keys for array indexes will
// always be an empty list.
List<LogicalVariable> secondaryKeyVars = new ArrayList<>();
List<LogicalVariable> beforeOpSecondaryKeyVars = new ArrayList<>();
List<Mutable<ILogicalExpression>> secondaryExpressions = new ArrayList<>();
List<Mutable<ILogicalExpression>> beforeOpSecondaryExpressions = new ArrayList<>();
ILogicalOperator replicateOutput;
if (!index.getIndexType().equals(IndexType.ARRAY)) {
for (int i = 0; i < secondaryKeyFields.size(); i++) {
IAType skType = secondaryKeyTypes.get(i);
Integer skSrc = secondaryKeySources.get(i);
List<String> skName = secondaryKeyFields.get(i);
ARecordType sourceType = dataset.hasMetaPart()
? skSrc.intValue() == Index.RECORD_INDICATOR ? recType : metaType : recType;
IndexFieldId indexFieldId = createIndexFieldId(index, skName, skType, skSrc, sourceType, sourceLoc);
LogicalVariable skVar = fieldVarsForNewRecord.get(indexFieldId);
secondaryKeyVars.add(skVar);
VariableReferenceExpression skVarRef = new VariableReferenceExpression(skVar);
skVarRef.setSourceLocation(sourceLoc);
secondaryExpressions.add(new MutableObject<>(skVarRef));
if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
LogicalVariable beforeKeyVar = fieldVarsForBeforeOperation.get(indexFieldId);
beforeOpSecondaryKeyVars.add(beforeKeyVar);
VariableReferenceExpression varRef = new VariableReferenceExpression(beforeKeyVar);
varRef.setSourceLocation(sourceLoc);
beforeOpSecondaryExpressions.add(new MutableObject<>(varRef));
}
}
}
IndexInsertDeleteUpsertOperator indexUpdate;
if (index.getIndexType() != IndexType.RTREE) {
// B-Tree, inverted index, array index
// Create an expression per key
Mutable<ILogicalExpression> filterExpression =
createFilterExpression(index, secondaryKeyVars, context.getOutputTypeEnvironment(currentTop),
index.getIndexDetails().isOverridingKeyFieldTypes());
Mutable<ILogicalExpression> beforeOpFilterExpression = null;
if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
beforeOpFilterExpression = createFilterExpression(index, beforeOpSecondaryKeyVars,
context.getOutputTypeEnvironment(currentTop),
index.getIndexDetails().isOverridingKeyFieldTypes());
}
DataSourceIndex dataSourceIndex = new DataSourceIndex(index, database, dataverseName, datasetName, mp);
// Introduce the TokenizeOperator only when doing bulk-load,
// and index type is keyword or n-gram.
if (index.getIndexType() != IndexType.BTREE && index.getIndexType() != IndexType.ARRAY
&& primaryIndexModificationOp.isBulkload()) {
// Note: Bulk load case, we don't need to take care of it for upsert operation
// Check whether the index is length-partitioned or not.
// If partitioned, [input variables to TokenizeOperator,
// token, number of token] pairs will be generated and
// fed into the IndexInsertDeleteOperator.
// If not, [input variables, token] pairs will be generated
// and fed into the IndexInsertDeleteOperator.
// Input variables are passed since TokenizeOperator is not an
// filtering operator.
boolean isPartitioned = index.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
|| index.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX;
// Create a new logical variable - token
List<LogicalVariable> tokenizeKeyVars = new ArrayList<>();
List<Mutable<ILogicalExpression>> tokenizeKeyExprs = new ArrayList<>();
LogicalVariable tokenVar = context.newVar();
tokenizeKeyVars.add(tokenVar);
VariableReferenceExpression tokenVarRef = new VariableReferenceExpression(tokenVar);
tokenVarRef.setSourceLocation(sourceLoc);
tokenizeKeyExprs.add(new MutableObject<ILogicalExpression>(tokenVarRef));
// Check the field type of the secondary key.
IAType secondaryKeyType;
Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(index,
secondaryKeyTypes.get(0), secondaryKeyFields.get(0), recType);
secondaryKeyType = keyPairType.first;
List<Object> varTypes = new ArrayList<>();
varTypes.add(NonTaggedFormatUtil.getTokenType(secondaryKeyType));
// If the index is a length-partitioned, then create
// additional variable - number of token.
// We use a special type for the length-partitioned index.
// The type is short, and this does not contain type info.
if (isPartitioned) {
LogicalVariable lengthVar = context.newVar();
tokenizeKeyVars.add(lengthVar);
VariableReferenceExpression lengthVarRef = new VariableReferenceExpression(lengthVar);
lengthVarRef.setSourceLocation(sourceLoc);
tokenizeKeyExprs.add(new MutableObject<ILogicalExpression>(lengthVarRef));
varTypes.add(BuiltinType.SHORTWITHOUTTYPEINFO);
}
// TokenizeOperator to tokenize [SK, PK] pairs
TokenizeOperator tokenUpdate = new TokenizeOperator(dataSourceIndex,
OperatorManipulationUtil
.cloneExpressions(primaryIndexModificationOp.getPrimaryKeyExpressions()),
secondaryExpressions, tokenizeKeyVars,
filterExpression != null
? new MutableObject<>(filterExpression.getValue().cloneExpression()) : null,
primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(),
isPartitioned, varTypes);
tokenUpdate.setSourceLocation(sourceLoc);
tokenUpdate.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
context.computeAndSetTypeEnvironmentForOperator(tokenUpdate);
replicateOutput = tokenUpdate;
indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
OperatorManipulationUtil
.cloneExpressions(primaryIndexModificationOp.getPrimaryKeyExpressions()),
tokenizeKeyExprs, filterExpression, beforeOpFilterExpression,
primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(),
primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0
: primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size());
indexUpdate.setSourceLocation(sourceLoc);
indexUpdate.setAdditionalFilteringExpressions(
OperatorManipulationUtil.cloneExpressions(filteringExpressions));
indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(tokenUpdate));
} else {
// When TokenizeOperator is not needed
indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
OperatorManipulationUtil
.cloneExpressions(primaryIndexModificationOp.getPrimaryKeyExpressions()),
secondaryExpressions, filterExpression, beforeOpFilterExpression,
primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(),
primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0
: primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size());
indexUpdate.setSourceLocation(sourceLoc);
indexUpdate.setAdditionalFilteringExpressions(
OperatorManipulationUtil.cloneExpressions(filteringExpressions));
replicateOutput = indexUpdate;
// We add the necessary expressions for upsert
if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
indexUpdate.setBeforeOpSecondaryKeyExprs(beforeOpSecondaryExpressions);
if (filteringFields != null) {
VariableReferenceExpression varRef =
new VariableReferenceExpression(primaryIndexModificationOp.getBeforeOpFilterVar());
varRef.setSourceLocation(sourceLoc);
indexUpdate.setBeforeOpAdditionalFilteringExpression(
new MutableObject<ILogicalExpression>(varRef));
}
}
indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
// For array indexes we have no secondary keys to reference. We must add separate branches to
// first extract our keys.
if (index.getIndexType() == IndexType.ARRAY && !isBulkload) {
NestedTupleSourceOperator unnestSourceOp =
new NestedTupleSourceOperator(new MutableObject<>(indexUpdate));
unnestSourceOp.setSourceLocation(sourceLoc);
context.computeAndSetTypeEnvironmentForOperator(unnestSourceOp);
UnnestBranchCreator unnestSIDXBranch = buildUnnestBranch(unnestSourceOp, index, newRecordVar,
newMetaVar, recType, metaType, dataset.hasMetaPart());
unnestSIDXBranch.applyProjectOnly();
// If there exists a filter expression, add it to the top of our nested plan.
filterExpression = (primaryIndexModificationOp.getOperation() == Kind.UPSERT) ? null
: createAnyUnknownFilterExpression(unnestSIDXBranch.lastFieldVars,
context.getOutputTypeEnvironment(unnestSIDXBranch.currentTop),
index.getIndexDetails().isOverridingKeyFieldTypes());
if (filterExpression != null) {
unnestSIDXBranch.applyFilteringExpression(filterExpression);
}
// Finalize our nested plan.
ILogicalPlan unnestPlan = unnestSIDXBranch.buildBranch();
indexUpdate.getNestedPlans().add(unnestPlan);
// If we have an UPSERT, then create and add a branch to extract our old keys as well.
if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
NestedTupleSourceOperator unnestBeforeSourceOp =
new NestedTupleSourceOperator(new MutableObject<>(indexUpdate));
unnestBeforeSourceOp.setSourceLocation(sourceLoc);
context.computeAndSetTypeEnvironmentForOperator(unnestBeforeSourceOp);
List<LogicalVariable> beforeOpMetaVars =
primaryIndexModificationOp.getBeforeOpAdditionalNonFilteringVars();
LogicalVariable beforeOpMetaVar = beforeOpMetaVars == null ? null : beforeOpMetaVars.get(0);
UnnestBranchCreator unnestBeforeSIDXBranch = buildUnnestBranch(unnestBeforeSourceOp, index,
primaryIndexModificationOp.getBeforeOpRecordVar(), beforeOpMetaVar, recType,
metaType, dataset.hasMetaPart());
unnestBeforeSIDXBranch.applyProjectOnly();
indexUpdate.getNestedPlans().add(unnestBeforeSIDXBranch.buildBranch());
}
} else if (index.getIndexType() == IndexType.ARRAY && isBulkload) {
// If we have a bulk load, we must sort the entire input by <SK, PK>. Do not use any
// nested plans here.
UnnestBranchCreator unnestSIDXBranch = buildUnnestBranch(currentTop, index, newRecordVar,
newMetaVar, recType, metaType, dataset.hasMetaPart());
unnestSIDXBranch.applyProjectDistinct(primaryIndexModificationOp.getPrimaryKeyExpressions(),
primaryIndexModificationOp.getAdditionalFilteringExpressions());
indexUpdate.getInputs().clear();
introduceNewOp(unnestSIDXBranch.currentTop, indexUpdate, true);
// Update the secondary expressions of our index.
secondaryExpressions = new ArrayList<>();
for (LogicalVariable var : unnestSIDXBranch.lastFieldVars) {
secondaryExpressions.add(new MutableObject<>(new VariableReferenceExpression(var)));
}
indexUpdate.setSecondaryKeyExprs(secondaryExpressions);
// Update the filter expression to include these new keys.
filterExpression = createAnyUnknownFilterExpression(unnestSIDXBranch.lastFieldVars,
context.getOutputTypeEnvironment(unnestSIDXBranch.currentTop),
index.getIndexDetails().isOverridingKeyFieldTypes());
indexUpdate.setFilterExpression(filterExpression);
if (replicateOp != null) {
// If we have a replicate, then update the replicate operator to include this branch.
replicateOp.getOutputs().add(new MutableObject<>(unnestSIDXBranch.currentBottom));
op0.getInputs().add(new MutableObject<ILogicalOperator>(indexUpdate));
continue;
}
}
}
} else {
// Get type, dimensions and number of keys
Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(index, secondaryKeyTypes.get(0),
secondaryKeyFields.get(0), recType);
IAType spatialType = keyPairType.first;
boolean isPointMBR =
spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D;
int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
int numKeys = (isPointMBR && isBulkload) ? dimension : dimension * 2;
// Get variables and expressions
List<LogicalVariable> keyVarList = new ArrayList<>();
List<Mutable<ILogicalExpression>> keyExprList = new ArrayList<>();
for (int i = 0; i < numKeys; i++) {
LogicalVariable keyVar = context.newVar();
keyVarList.add(keyVar);
AbstractFunctionCallExpression createMBR =
new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.CREATE_MBR));
createMBR.setSourceLocation(sourceLoc);
VariableReferenceExpression secondaryKeyVarRef =
new VariableReferenceExpression(secondaryKeyVars.get(0));
secondaryKeyVarRef.setSourceLocation(sourceLoc);
createMBR.getArguments().add(new MutableObject<ILogicalExpression>(secondaryKeyVarRef));
createMBR.getArguments().add(new MutableObject<ILogicalExpression>(
new ConstantExpression(new AsterixConstantValue(new AInt32(dimension)))));
createMBR.getArguments().add(new MutableObject<ILogicalExpression>(
new ConstantExpression(new AsterixConstantValue(new AInt32(i)))));
keyExprList.add(new MutableObject<ILogicalExpression>(createMBR));
}
secondaryExpressions.clear();
for (LogicalVariable secondaryKeyVar : keyVarList) {
VariableReferenceExpression secondaryKeyVarRef = new VariableReferenceExpression(secondaryKeyVar);
secondaryKeyVarRef.setSourceLocation(sourceLoc);
secondaryExpressions.add(new MutableObject<ILogicalExpression>(secondaryKeyVarRef));
}
if (isPointMBR && isBulkload) {
//for PointMBR optimization: see SecondaryRTreeOperationsHelper.buildLoadingJobSpec() and
//createFieldPermutationForBulkLoadOp(int) for more details.
for (LogicalVariable secondaryKeyVar : keyVarList) {
VariableReferenceExpression secondaryKeyVarRef =
new VariableReferenceExpression(secondaryKeyVar);
secondaryKeyVarRef.setSourceLocation(sourceLoc);
secondaryExpressions.add(new MutableObject<ILogicalExpression>(secondaryKeyVarRef));
}
}
AssignOperator assignCoordinates = new AssignOperator(keyVarList, keyExprList);
assignCoordinates.setSourceLocation(sourceLoc);
assignCoordinates.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
context.computeAndSetTypeEnvironmentForOperator(assignCoordinates);
replicateOutput = assignCoordinates;
boolean forceFilter = keyPairType.second;
Mutable<ILogicalExpression> filterExpression = createAnyUnknownFilterExpression(keyVarList,
context.getOutputTypeEnvironment(assignCoordinates), forceFilter);
AssignOperator originalAssignCoordinates = null;
Mutable<ILogicalExpression> beforeOpFilterExpression = null;
// We do something similar for beforeOp key if the operation is an upsert
if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
List<LogicalVariable> originalKeyVarList = new ArrayList<>();
List<Mutable<ILogicalExpression>> originalKeyExprList = new ArrayList<>();
// we don't do any filtering since nulls are expected here and there
for (int i = 0; i < numKeys; i++) {
LogicalVariable keyVar = context.newVar();
originalKeyVarList.add(keyVar);
AbstractFunctionCallExpression createMBR = new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(BuiltinFunctions.CREATE_MBR));
createMBR.setSourceLocation(sourceLoc);
createMBR.getArguments().add(
new MutableObject<>(beforeOpSecondaryExpressions.get(0).getValue().cloneExpression()));
createMBR.getArguments().add(new MutableObject<ILogicalExpression>(
new ConstantExpression(new AsterixConstantValue(new AInt32(dimension)))));
createMBR.getArguments().add(new MutableObject<ILogicalExpression>(
new ConstantExpression(new AsterixConstantValue(new AInt32(i)))));
originalKeyExprList.add(new MutableObject<ILogicalExpression>(createMBR));
}
beforeOpSecondaryExpressions.clear();
for (LogicalVariable secondaryKeyVar : originalKeyVarList) {
VariableReferenceExpression secondaryKeyVarRef =
new VariableReferenceExpression(secondaryKeyVar);
secondaryKeyVarRef.setSourceLocation(sourceLoc);
beforeOpSecondaryExpressions.add(new MutableObject<ILogicalExpression>(secondaryKeyVarRef));
}
originalAssignCoordinates = new AssignOperator(originalKeyVarList, originalKeyExprList);
originalAssignCoordinates.setSourceLocation(sourceLoc);
originalAssignCoordinates.getInputs().add(new MutableObject<ILogicalOperator>(assignCoordinates));
context.computeAndSetTypeEnvironmentForOperator(originalAssignCoordinates);
beforeOpFilterExpression = createAnyUnknownFilterExpression(originalKeyVarList,
context.getOutputTypeEnvironment(originalAssignCoordinates), forceFilter);
}
DataSourceIndex dataSourceIndex = new DataSourceIndex(index, database, dataverseName, datasetName, mp);
indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
OperatorManipulationUtil
.cloneExpressions(primaryIndexModificationOp.getPrimaryKeyExpressions()),
secondaryExpressions, filterExpression, beforeOpFilterExpression,
primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(),
primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0
: primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size());
indexUpdate.setSourceLocation(sourceLoc);
indexUpdate.setAdditionalFilteringExpressions(
OperatorManipulationUtil.cloneExpressions(filteringExpressions));
if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
// set before op secondary key expressions
if (filteringFields != null) {
VariableReferenceExpression varRef =
new VariableReferenceExpression(primaryIndexModificationOp.getBeforeOpFilterVar());
varRef.setSourceLocation(sourceLoc);
indexUpdate.setBeforeOpAdditionalFilteringExpression(
new MutableObject<ILogicalExpression>(varRef));
}
// set filtering expressions
indexUpdate.setBeforeOpSecondaryKeyExprs(beforeOpSecondaryExpressions);
// assign --> assign beforeOp values --> secondary index upsert
indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(originalAssignCoordinates));
} else {
indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(assignCoordinates));
}
}
if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
indexUpdate.setOperationExpr(new MutableObject<>(
new VariableReferenceExpression(primaryIndexModificationOp.getOperationVar())));
}
context.computeAndSetTypeEnvironmentForOperator(indexUpdate);
if (!primaryIndexModificationOp.isBulkload() || secondaryIndexTotalCnt == 1) {
currentTop = indexUpdate;
} else {
replicateOp.getOutputs().add(new MutableObject<>(replicateOutput));
/* special treatment for bulk load with the existence of secondary primary index.
* the branch coming out of the replicate operator and feeding the index will not have the usual
* "blocking" sort operator since tuples are already sorted. We mark the materialization flag for that
* branch to make it blocking. Without "blocking", the activity cluster graph would be messed up
*/
if (index.isPrimaryKeyIndex()) {
int positionOfSecondaryPrimaryIndex = replicateOp.getOutputs().size() - 1;
replicateOp.getOutputMaterializationFlags()[positionOfSecondaryPrimaryIndex] = true;
}
}
if (primaryIndexModificationOp.isBulkload()) {
// For bulk load, we connect all fanned out insert operator to a single SINK operator
op0.getInputs().add(new MutableObject<ILogicalOperator>(indexUpdate));
}
}
if (!primaryIndexModificationOp.isBulkload()) {
// If this is an upsert, we need to
// Remove the current input to the SINK operator (It is actually already removed above)
op0.getInputs().clear();
// Connect the last index update to the SINK
op0.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
}
return true;
}