in asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADExpressionToPlanTranslator.java [120:192]
protected ILogicalOperator translateUpsert(DatasetDataSource targetDatasource,
Mutable<ILogicalExpression> payloadVarRef, List<Mutable<ILogicalExpression>> varRefsForLoading,
ILogicalOperator pkeyAssignOp, LogicalVariable unnestVar, ILogicalOperator topOp,
List<Mutable<ILogicalExpression>> pkeyExprs, LogicalVariable seqVar,
CompiledStatements.ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws AlgebricksException {
SourceLocation sourceLoc = stmt.getSourceLocation();
CompiledStatements.CompiledUpsertStatement compiledUpsert = (CompiledStatements.CompiledUpsertStatement) stmt;
Expression returnExpression = compiledUpsert.getReturnExpression();
InsertDeleteUpsertOperator upsertOp;
ILogicalOperator rootOperator;
LogicalVariable metaVar = null;
ARecordType recordType = (ARecordType) targetDatasource.getItemType();
if (targetDatasource.getDataset().hasMetaPart()) {
IAType metaType = metadataProvider.findMetaType(targetDatasource.getDataset());
metaVar = context.newVar();
AssignOperator metaVariableAssignOp =
new AssignOperator(metaVar, new MutableObject<>(makeMetaRecordExpr(metaType)));
metaVariableAssignOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
pkeyAssignOp = metaVariableAssignOp;
metaVariableAssignOp.setSourceLocation(sourceLoc);
List<Mutable<ILogicalExpression>> metaExprs = new ArrayList<>(1);
VariableReferenceExpression metaVarRef = new VariableReferenceExpression(metaVar);
metaExprs.add(new MutableObject<>(metaVarRef));
upsertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadVarRef, varRefsForLoading, metaExprs,
InsertDeleteUpsertOperator.Kind.UPSERT, false);
// set previous meta vars
List<LogicalVariable> metaVars = new ArrayList<>();
metaVars.add(context.newVar());
upsertOp.setPrevAdditionalNonFilteringVars(metaVars);
List<Object> metaTypes = new ArrayList<>();
metaTypes.add(targetDatasource.getMetaItemType());
upsertOp.setPrevAdditionalNonFilteringTypes(metaTypes);
} else {
upsertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadVarRef, varRefsForLoading,
InsertDeleteUpsertOperator.Kind.UPSERT, false);
}
// Create and add a new variable used for representing the original record
upsertOp.setUpsertIndicatorVar(context.newVar());
upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
upsertOp.setPrevRecordVar(context.newVar());
upsertOp.setPrevRecordType(recordType);
upsertOp.setSourceLocation(sourceLoc);
upsertOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
List<String> filterField = DatasetUtil.getFilterField(targetDatasource.getDataset());
List<Mutable<ILogicalExpression>> filterExprs = null;
Integer filterSourceIndicator = DatasetUtil.getFilterSourceIndicator(targetDatasource.getDataset());
if (filterField != null) {
filterExprs = generatedFilterExprs(upsertOp, filterField, filterSourceIndicator == 0 ? seqVar : metaVar,
sourceLoc);
ARecordType filterSourceType = filterSourceIndicator == 0 ? (ARecordType) targetDatasource.getItemType()
: (ARecordType) targetDatasource.getMetaItemType();
upsertOp.setAdditionalFilteringExpressions(filterExprs);
upsertOp.setPrevFilterVar(context.newVar());
upsertOp.setPrevFilterType(filterSourceType.getFieldType(filterField.get(0)));
} else {
upsertOp.setAdditionalFilteringExpressions(null);
}
// Set up delegate operator
DelegateOperator delegateOperator = new DelegateOperator(new CommitOperator(returnExpression == null));
delegateOperator.getInputs().add(new MutableObject<>(upsertOp));
delegateOperator.setSourceLocation(sourceLoc);
rootOperator = delegateOperator;
// Compiles the return expression.
return processReturningExpression(rootOperator, upsertOp, compiledUpsert, resultMetadata);
}