in phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java [354:883]
public MutationPlan compile(UpsertStatement upsert) throws SQLException {
final PhoenixConnection connection = statement.getConnection();
ConnectionQueryServices services = connection.getQueryServices();
final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
final long maxSizeBytes = services.getProps()
.getLongBytes(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,
QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
List<ColumnName> columnNodes = upsert.getColumns();
TableRef tableRefToBe = null;
PTable table = null;
Set<PColumn> addViewColumnsToBe = Collections.emptySet();
Set<PColumn> overlapViewColumnsToBe = Collections.emptySet();
List<PColumn> allColumnsToBe = Collections.emptyList();
boolean isTenantSpecific = false;
boolean isSharedViewIndex = false;
String tenantIdStr = null;
ColumnResolver resolver = null;
int[] columnIndexesToBe;
int nColumnsToSet = 0;
int[] pkSlotIndexesToBe;
List<ParseNode> valueNodes = upsert.getValues();
List<PColumn> targetColumns;
NamedTableNode tableNode = upsert.getTable();
String tableName = tableNode.getName().getTableName();
String schemaName = tableNode.getName().getSchemaName();
QueryPlan queryPlanToBe = null;
int nValuesToSet;
boolean sameTable = false;
boolean runOnServer = false;
boolean serverUpsertSelectEnabled =
services.getProps().getBoolean(QueryServices.ENABLE_SERVER_UPSERT_SELECT,
QueryServicesOptions.DEFAULT_ENABLE_SERVER_UPSERT_SELECT);
boolean allowServerMutations =
services.getProps().getBoolean(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
QueryServicesOptions.DEFAULT_ENABLE_SERVER_SIDE_UPSERT_MUTATIONS);
UpsertingParallelIteratorFactory parallelIteratorFactoryToBe = null;
boolean useServerTimestampToBe = false;
resolver = FromCompiler.getResolverForMutation(upsert, connection);
tableRefToBe = resolver.getTables().get(0);
table = tableRefToBe.getTable();
// Cannot update:
// - read-only VIEW
// - transactional table with a connection having an SCN
// - table with indexes and SCN set
// - tables with ROW_TIMESTAMP columns
if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) {
throw new ReadOnlyTableException(schemaName,tableName);
} else if (connection.isBuildingIndex() && table.getType() != PTableType.INDEX) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.ONLY_INDEX_UPDATABLE_AT_SCN)
.setSchemaName(schemaName)
.setTableName(tableName)
.build().buildException();
} else if (table.isTransactional() && connection.getSCN() != null) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode
.CANNOT_SPECIFY_SCN_FOR_TXN_TABLE)
.setSchemaName(schemaName)
.setTableName(tableName).build().buildException();
} else if (connection.getSCN() != null && !table.getIndexes().isEmpty()
&& !connection.isRunningUpgrade() && !connection.isBuildingIndex()) {
throw new SQLExceptionInfo
.Builder(SQLExceptionCode
.CANNOT_UPSERT_WITH_SCN_FOR_TABLE_WITH_INDEXES)
.setSchemaName(schemaName)
.setTableName(tableName).build().buildException();
} else if(connection.getSCN() != null && !connection.isRunningUpgrade()
&& !connection.isBuildingIndex() && table.getRowTimestampColPos() >= 0) {
throw new SQLExceptionInfo
.Builder(SQLExceptionCode
.CANNOT_UPSERT_WITH_SCN_FOR_ROW_TIMESTAMP_COLUMN)
.setSchemaName(schemaName)
.setTableName(tableName).build().buildException();
}
boolean isSalted = table.getBucketNum() != null;
isTenantSpecific = table.isMultiTenant() && connection.getTenantId() != null;
isSharedViewIndex = table.getViewIndexId() != null;
tenantIdStr = isTenantSpecific ? connection.getTenantId().getString() : null;
int posOffset = isSalted ? 1 : 0;
// Setup array of column indexes parallel to values that are going to be set
allColumnsToBe = table.getColumns();
nColumnsToSet = 0;
if (table.getViewType() == ViewType.UPDATABLE) {
addViewColumnsToBe = Sets.newLinkedHashSetWithExpectedSize(allColumnsToBe.size());
for (PColumn column : allColumnsToBe) {
if (column.getViewConstant() != null) {
addViewColumnsToBe.add(column);
}
}
}
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
// Allow full row upsert if no columns or only dynamic ones are specified and values count match
if (columnNodes.isEmpty() || columnNodes.size() == upsert.getTable().getDynamicColumns().size()) {
nColumnsToSet = allColumnsToBe.size() - posOffset;
columnIndexesToBe = new int[nColumnsToSet];
pkSlotIndexesToBe = new int[columnIndexesToBe.length];
targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length);
targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null));
int minPKPos = 0;
if (isSharedViewIndex) {
PColumn indexIdColumn = table.getPKColumns().get(minPKPos);
columnIndexesToBe[minPKPos] = indexIdColumn.getPosition();
targetColumns.set(minPKPos, indexIdColumn);
minPKPos++;
}
if (isTenantSpecific) {
PColumn tenantColumn = table.getPKColumns().get(minPKPos);
columnIndexesToBe[minPKPos] = tenantColumn.getPosition();
targetColumns.set(minPKPos, tenantColumn);
minPKPos++;
}
for (int i = posOffset, j = 0; i < allColumnsToBe.size(); i++) {
PColumn column = allColumnsToBe.get(i);
if (SchemaUtil.isPKColumn(column)) {
pkSlotIndexesToBe[i-posOffset] = j + posOffset;
if (j++ < minPKPos) { // Skip, as it's already been set above
continue;
}
minPKPos = 0;
}
columnIndexesToBe[i-posOffset+minPKPos] = i;
targetColumns.set(i-posOffset+minPKPos, column);
}
if (!addViewColumnsToBe.isEmpty()) {
// All view columns overlap in this case
overlapViewColumnsToBe = addViewColumnsToBe;
addViewColumnsToBe = Collections.emptySet();
}
} else {
// Size for worse case
int numColsInUpsert = columnNodes.size();
nColumnsToSet = numColsInUpsert + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) + + (isSharedViewIndex ? 1 : 0);
columnIndexesToBe = new int[nColumnsToSet];
pkSlotIndexesToBe = new int[columnIndexesToBe.length];
targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length);
targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null));
Arrays.fill(columnIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced
Arrays.fill(pkSlotIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced
BitSet columnsBeingSet = new BitSet(table.getColumns().size());
int i = 0;
if (isSharedViewIndex) {
PColumn indexIdColumn = table.getPKColumns().get(i + posOffset);
columnsBeingSet.set(columnIndexesToBe[i] = indexIdColumn.getPosition());
pkSlotIndexesToBe[i] = i + posOffset;
targetColumns.set(i, indexIdColumn);
i++;
}
// Add tenant column directly, as we don't want to resolve it as this will fail
if (isTenantSpecific) {
PColumn tenantColumn = table.getPKColumns().get(i + posOffset);
columnsBeingSet.set(columnIndexesToBe[i] = tenantColumn.getPosition());
pkSlotIndexesToBe[i] = i + posOffset;
targetColumns.set(i, tenantColumn);
i++;
}
for (ColumnName colName : columnNodes) {
ColumnRef ref = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName());
PColumn column = ref.getColumn();
if (IndexUtil.getViewConstantValue(column, ptr)) {
if (overlapViewColumnsToBe.isEmpty()) {
overlapViewColumnsToBe = Sets.newHashSetWithExpectedSize(addViewColumnsToBe.size());
}
nColumnsToSet--;
overlapViewColumnsToBe.add(column);
addViewColumnsToBe.remove(column);
}
columnsBeingSet.set(columnIndexesToBe[i] = ref.getColumnPosition());
targetColumns.set(i, column);
if (SchemaUtil.isPKColumn(column)) {
pkSlotIndexesToBe[i] = ref.getPKSlotPosition();
}
i++;
}
for (PColumn column : addViewColumnsToBe) {
columnsBeingSet.set(columnIndexesToBe[i] = column.getPosition());
targetColumns.set(i, column);
if (SchemaUtil.isPKColumn(column)) {
pkSlotIndexesToBe[i] = SchemaUtil.getPKPosition(table, column);
}
i++;
}
// If a table has rowtimestamp col, then we always set it.
useServerTimestampToBe = table.getRowTimestampColPos() != -1 && !isRowTimestampSet(pkSlotIndexesToBe, table);
if (useServerTimestampToBe) {
PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos());
// Need to resize columnIndexesToBe and pkSlotIndexesToBe to include this extra column.
columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, columnIndexesToBe.length + 1);
pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, pkSlotIndexesToBe.length + 1);
columnsBeingSet.set(columnIndexesToBe[i] = rowTimestampCol.getPosition());
pkSlotIndexesToBe[i] = table.getRowTimestampColPos();
targetColumns.add(rowTimestampCol);
if (valueNodes != null && !valueNodes.isEmpty()) {
valueNodes.add(getNodeForRowTimestampColumn(rowTimestampCol));
}
nColumnsToSet++;
}
for (i = posOffset; i < table.getColumns().size(); i++) {
PColumn column = table.getColumns().get(i);
if (!columnsBeingSet.get(i) && !column.isNullable() && column.getExpressionStr() == null) {
throw new ConstraintViolationException(table.getName().getString() + "."
+ SchemaUtil.getColumnDisplayName(column) + " may not be null");
}
}
}
boolean isAutoCommit = connection.getAutoCommit();
if (valueNodes == null) {
SelectStatement select = upsert.getSelect();
assert(select != null);
select = SubselectRewriter.flatten(select, connection);
ColumnResolver selectResolver = FromCompiler.getResolverForQuery(select, connection, false, upsert.getTable().getName());
select = StatementNormalizer.normalize(select, selectResolver);
select = prependTenantAndViewConstants(table, select, tenantIdStr, addViewColumnsToBe, useServerTimestampToBe);
SelectStatement transformedSelect = SubqueryRewriter.transform(select, selectResolver, connection);
if (transformedSelect != select) {
selectResolver = FromCompiler.getResolverForQuery(transformedSelect, connection, false, upsert.getTable().getName());
select = StatementNormalizer.normalize(transformedSelect, selectResolver);
}
sameTable = !select.isJoin()
&& tableRefToBe.equals(selectResolver.getTables().get(0));
/* We can run the upsert in a coprocessor if:
* 1) from has only 1 table or server UPSERT SELECT is enabled
* 2) the select query isn't doing aggregation (which requires a client-side final merge)
* 3) autoCommit is on
* 4) the table is not immutable with indexes, as the client is the one that figures out the additional
* puts for index tables.
* 5) no limit clause, as the limit clause requires client-side post processing
* 6) no sequences, as sequences imply that the order of upsert must match the order of
* selection. TODO: change this and only force client side if there's a ORDER BY on the sequence value
* Otherwise, run the query to pull the data from the server
* and populate the MutationState (upto a limit).
*/
if (! (select.isAggregate() || select.isDistinct() || select.getLimit() != null || select.hasSequence()) ) {
// We can pipeline the upsert select instead of spooling everything to disk first,
// if we don't have any post processing that's required.
parallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe, useServerTimestampToBe);
// If we're in the else, then it's not an aggregate, distinct, limited, or sequence using query,
// so we might be able to run it entirely on the server side.
// region space managed by region servers. So we bail out on executing on server side.
// Disable running upsert select on server side if a table has global mutable secondary indexes on it
boolean hasGlobalMutableIndexes = SchemaUtil.hasGlobalIndex(table) && !table.isImmutableRows();
boolean hasWhereSubquery = select.getWhere() != null && select.getWhere().hasSubquery();
runOnServer = (sameTable || (serverUpsertSelectEnabled && !hasGlobalMutableIndexes)) && isAutoCommit
// We can run the upsert select for initial index population on server side for transactional
// tables since the writes do not need to be done transactionally, since we gate the index
// usage on successfully writing all data rows.
&& (!table.isTransactional() || table.getType() == PTableType.INDEX)
&& !(table.isImmutableRows() && !table.getIndexes().isEmpty())
&& !select.isJoin() && !hasWhereSubquery && table.getRowTimestampColPos() == -1;
}
runOnServer &= allowServerMutations;
// If we may be able to run on the server, add a hint that favors using the data table
// if all else is equal.
// TODO: it'd be nice if we could figure out in advance if the PK is potentially changing,
// as this would disallow running on the server. We currently use the row projector we
// get back to figure this out.
HintNode hint = upsert.getHint();
if (!upsert.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
}
select = SelectStatement.create(select, hint);
// Pass scan through if same table in upsert and select so that projection is computed correctly
// Use optimizer to choose the best plan
QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), true, false, null);
queryPlanToBe = compiler.compile();
if (sameTable) {
// in the UPSERT INTO X ... SELECT FROM X case enforce the source tableRef's TS
// as max TS, so that the query can safely restarted and still work of a snapshot
// (so it won't see its own data in case of concurrent splits)
// see PHOENIX-4849
long serverTime = selectResolver.getTables().get(0).getTimeStamp();
if (serverTime == QueryConstants.UNSET_TIMESTAMP) {
// if this is the first time this table is resolved the ref's current time might not be defined, yet
// in that case force an RPC to get the server time
serverTime = new MetaDataClient(connection).getCurrentTime(schemaName, tableName);
}
Scan scan = queryPlanToBe.getContext().getScan();
ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), serverTime);
}
// This is post-fix: if the tableRef is a projected table, this means there are post-processing
// steps and parallelIteratorFactory did not take effect.
if (queryPlanToBe.getTableRef().getTable().getType() == PTableType.PROJECTED || queryPlanToBe.getTableRef().getTable().getType() == PTableType.SUBQUERY) {
parallelIteratorFactoryToBe = null;
}
nValuesToSet = queryPlanToBe.getProjector().getColumnCount();
// Cannot auto commit if doing aggregation or topN or salted
// Salted causes problems because the row may end up living on a different region
} else {
nValuesToSet = valueNodes.size() + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
}
// Resize down to allow a subset of columns to be specifiable
if (columnNodes.isEmpty() && columnIndexesToBe.length >= nValuesToSet) {
nColumnsToSet = nValuesToSet;
columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, nValuesToSet);
pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, nValuesToSet);
for (int i = posOffset + nValuesToSet; i < table.getColumns().size(); i++) {
PColumn column = table.getColumns().get(i);
if (!column.isNullable() && column.getExpressionStr() == null) {
throw new ConstraintViolationException(table.getName().getString() + "."
+ SchemaUtil.getColumnDisplayName(column) + " may not be null");
}
}
}
if (nValuesToSet != nColumnsToSet) {
// We might have added columns, so refresh cache and try again if stale.
// We have logic to catch MetaNotFoundException and refresh cache in PhoenixStatement
// Note that this check is not really sufficient, as a column could have
// been removed and the added back and we wouldn't detect that here.
throw new UpsertColumnsValuesMismatchException(schemaName, tableName,
"Numbers of columns: " + nColumnsToSet + ". Number of values: " + nValuesToSet);
}
final QueryPlan originalQueryPlan = queryPlanToBe;
RowProjector projectorToBe = null;
// Optimize only after all checks have been performed
if (valueNodes == null) {
queryPlanToBe = new QueryOptimizer(services).optimize(queryPlanToBe, statement, targetColumns, parallelIteratorFactoryToBe);
projectorToBe = queryPlanToBe.getProjector();
}
final List<PColumn> allColumns = allColumnsToBe;
final RowProjector projector = projectorToBe;
final QueryPlan queryPlan = queryPlanToBe;
final TableRef tableRef = tableRefToBe;
final Set<PColumn> addViewColumns = addViewColumnsToBe;
final Set<PColumn> overlapViewColumns = overlapViewColumnsToBe;
final UpsertingParallelIteratorFactory parallelIteratorFactory = parallelIteratorFactoryToBe;
final int[] columnIndexes = columnIndexesToBe;
final int[] pkSlotIndexes = pkSlotIndexesToBe;
final boolean useServerTimestamp = useServerTimestampToBe;
if (table.getRowTimestampColPos() == -1 && useServerTimestamp) {
throw new IllegalStateException("For a table without row timestamp column, useServerTimestamp cannot be true");
}
// TODO: break this up into multiple functions
////////////////////////////////////////////////////////////////////
// UPSERT SELECT
/////////////////////////////////////////////////////////////////////
if (valueNodes == null) {
// Before we re-order, check that for updatable view columns
// the projected expression either matches the column name or
// is a constant with the same required value.
throwIfNotUpdatable(tableRef, overlapViewColumnsToBe, targetColumns, projector, sameTable);
////////////////////////////////////////////////////////////////////
// UPSERT SELECT run server-side (maybe)
/////////////////////////////////////////////////////////////////////
if (runOnServer) {
// At most this array will grow bigger by the number of PK columns
int[] allColumnsIndexes = Arrays.copyOf(columnIndexes, columnIndexes.length + nValuesToSet);
int[] reverseColumnIndexes = new int[table.getColumns().size()];
List<Expression> projectedExpressions = Lists.newArrayListWithExpectedSize(reverseColumnIndexes.length);
Arrays.fill(reverseColumnIndexes, -1);
for (int i =0; i < nValuesToSet; i++) {
projectedExpressions.add(projector.getColumnProjector(i).getExpression());
reverseColumnIndexes[columnIndexes[i]] = i;
}
/*
* Order projected columns and projected expressions with PK columns
* leading order by slot position
*/
int offset = table.getBucketNum() == null ? 0 : 1;
for (int i = 0; i < table.getPKColumns().size() - offset; i++) {
PColumn column = table.getPKColumns().get(i + offset);
int pos = reverseColumnIndexes[column.getPosition()];
if (pos == -1) {
// Last PK column may be fixed width and nullable
// We don't want to insert a null expression b/c
// it's not valid to set a fixed width type to null.
if (column.getDataType().isFixedWidth()) {
continue;
}
// Add literal null for missing PK columns
pos = projectedExpressions.size();
Expression literalNull = LiteralExpression.newConstant(null, column.getDataType(), Determinism.ALWAYS);
projectedExpressions.add(literalNull);
allColumnsIndexes[pos] = column.getPosition();
}
// Swap select expression at pos with i
Collections.swap(projectedExpressions, i, pos);
// Swap column indexes and reverse column indexes too
int tempPos = allColumnsIndexes[i];
allColumnsIndexes[i] = allColumnsIndexes[pos];
allColumnsIndexes[pos] = tempPos;
reverseColumnIndexes[tempPos] = pos;
reverseColumnIndexes[i] = i;
}
// If any pk slots are changing and server side UPSERT SELECT is disabled, do not run on server
if (!serverUpsertSelectEnabled && ExpressionUtil
.isPkPositionChanging(new TableRef(table), projectedExpressions)) {
runOnServer = false;
}
////////////////////////////////////////////////////////////////////
// UPSERT SELECT run server-side
/////////////////////////////////////////////////////////////////////
if (runOnServer) {
// Iterate through columns being projected
List<PColumn> projectedColumns = Lists.newArrayListWithExpectedSize(projectedExpressions.size());
int posOff = table.getBucketNum() != null ? 1 : 0;
for (int i = 0 ; i < projectedExpressions.size(); i++) {
// Must make new column if position has changed
PColumn column = allColumns.get(allColumnsIndexes[i]);
projectedColumns.add(column.getPosition() == i + posOff ? column : new PColumnImpl(column, i + posOff));
}
// Build table from projectedColumns
// Hack to add default column family to be used on server in case no value column is projected.
PTable projectedTable = PTableImpl.builderWithColumns(table, projectedColumns)
.setExcludedColumns(ImmutableList.of())
.setDefaultFamilyName(PNameFactory.newName(SchemaUtil.getEmptyColumnFamily(table)))
.build();
SelectStatement select = SelectStatement.create(SelectStatement.COUNT_ONE, upsert.getHint());
StatementContext statementContext = queryPlan.getContext();
RowProjector aggProjectorToBe = ProjectionCompiler.compile(statementContext, select, GroupBy
.EMPTY_GROUP_BY);
statementContext.getAggregationManager().compile(queryPlan.getContext()
,GroupBy.EMPTY_GROUP_BY);
if (queryPlan.getProjector().projectEveryRow()) {
aggProjectorToBe = new RowProjector(aggProjectorToBe,true);
}
final RowProjector aggProjector = aggProjectorToBe;
/*
* Transfer over PTable representing subset of columns selected, but all PK columns.
* Move columns setting PK first in pkSlot order, adding LiteralExpression of null for any missing ones.
* Transfer over List<Expression> for projection.
* In region scan, evaluate expressions in order, collecting first n columns for PK and collection non PK in mutation Map
* Create the PRow and get the mutations, adding them to the batch
*/
final StatementContext context = queryPlan.getContext();
final Scan scan = context.getScan();
scan.setAttribute(BaseScannerRegionObserverConstants.UPSERT_SELECT_TABLE, UngroupedAggregateRegionObserverHelper.serialize(projectedTable));
scan.setAttribute(BaseScannerRegionObserverConstants.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserverHelper.serialize(projectedExpressions));
// Ignore order by - it has no impact
final QueryPlan aggPlan = new AggregatePlan(context, select, statementContext.getCurrentTable(), aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, originalQueryPlan);
return new ServerUpsertSelectMutationPlan(queryPlan, tableRef, originalQueryPlan, context, connection, scan, aggPlan, aggProjector, maxSize, maxSizeBytes);
}
}
////////////////////////////////////////////////////////////////////
// UPSERT SELECT run client-side
/////////////////////////////////////////////////////////////////////
return new ClientUpsertSelectMutationPlan(queryPlan, tableRef, originalQueryPlan, parallelIteratorFactory, projector, columnIndexes, pkSlotIndexes, useServerTimestamp, maxSize, maxSizeBytes);
}
////////////////////////////////////////////////////////////////////
// UPSERT VALUES
/////////////////////////////////////////////////////////////////////
final byte[][] values = new byte[nValuesToSet][];
int nodeIndex = 0;
if (isSharedViewIndex) {
values[nodeIndex++] = table.getviewIndexIdType().toBytes(table.getViewIndexId());
}
if (isTenantSpecific) {
PName tenantId = connection.getTenantId();
values[nodeIndex++] = ScanUtil.getTenantIdBytes(table.getRowKeySchema(), table.getBucketNum() != null, tenantId, isSharedViewIndex);
}
final int nodeIndexOffset = nodeIndex;
// Allocate array based on size of all columns in table,
// since some values may not be set (if they're nullable).
final StatementContext context = new StatementContext(statement, resolver, new Scan(), new SequenceManager(statement));
UpsertValuesCompiler expressionBuilder = new UpsertValuesCompiler(context);
final List<Expression> constantExpressions = Lists.newArrayListWithExpectedSize(valueNodes.size());
// First build all the expressions, as with sequences we want to collect them all first
// and initialize them in one batch
List<Pair<ColumnName, ParseNode>> jsonExpressions = Lists.newArrayList();
List<Pair<ColumnName, ParseNode>> nonPKColumns = Lists.newArrayList();
for (ParseNode valueNode : valueNodes) {
if (!valueNode.hasJsonExpression() && !valueNode.isStateless()) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.VALUE_IN_UPSERT_NOT_CONSTANT).build().buildException();
}
PColumn column = allColumns.get(columnIndexes[nodeIndex]);
expressionBuilder.setColumn(column);
Expression expression = valueNode.accept(expressionBuilder);
if (expression.getDataType() != null && !expression.getDataType().isCastableTo(column.getDataType())) {
throw TypeMismatchException.newException(
expression.getDataType(), column.getDataType(), "expression: "
+ expression.toString() + " in column " + column);
}
if (!SchemaUtil.isPKColumn(column) && !valueNode.hasJsonExpression()) {
nonPKColumns.add(new Pair<>(
ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(),
column.getName().getString()), valueNode));
} else if (valueNode.hasJsonExpression()) {
jsonExpressions.add(new Pair<>(
ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(),
column.getName().getString()), valueNode));
}
constantExpressions.add(expression);
nodeIndex++;
}
if (nonPKColumns.size() > 0 && jsonExpressions.size() > 0) {
jsonExpressions.addAll(nonPKColumns);
nonPKColumns.clear();
}
byte[] onDupKeyBytesToBe = null;
List<Pair<ColumnName, ParseNode>> onDupKeyPairs = upsert.getOnDupKeyPairs();
if (onDupKeyPairs != null) {
if (table.isImmutableRows()) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE)
.setSchemaName(table.getSchemaName().getString())
.setTableName(table.getTableName().getString())
.build().buildException();
}
if (table.isTransactional()) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL)
.setSchemaName(table.getSchemaName().getString())
.setTableName(table.getTableName().getString())
.build().buildException();
}
if (connection.getSCN() != null) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_SCN_IN_ON_DUP_KEY)
.setSchemaName(table.getSchemaName().getString())
.setTableName(table.getTableName().getString())
.build().buildException();
}
if (onDupKeyPairs.isEmpty()) { // ON DUPLICATE KEY IGNORE
onDupKeyBytesToBe = PhoenixIndexBuilderHelper.serializeOnDupKeyIgnore();
} else { // ON DUPLICATE KEY UPDATE;
onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context, onDupKeyPairs, resolver);
}
} else if (!jsonExpressions.isEmpty()) {
onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context, jsonExpressions, resolver);
}
final byte[] onDupKeyBytes = onDupKeyBytesToBe;
return new UpsertValuesMutationPlan(context, tableRef, nodeIndexOffset, constantExpressions,
allColumns, columnIndexes, overlapViewColumns, values, addViewColumns,
connection, pkSlotIndexes, useServerTimestamp, onDupKeyBytes, maxSize, maxSizeBytes);
}