in fe/src/main/java/org/apache/impala/analysis/InsertStmt.java [829:1037]
private void prepareExpressions(List<Column> selectExprTargetColumns,
List<Expr> selectListExprs, FeTable tbl, Analyzer analyzer)
throws AnalysisException {
// Temporary lists of partition key exprs and names in an arbitrary order.
List<Expr> tmpPartitionKeyExprs = new ArrayList<>();
List<String> tmpPartitionKeyNames = new ArrayList<>();
int numClusteringCols = (tbl instanceof FeHBaseTable) ? 0
: tbl.getNumClusteringCols();
boolean isKuduTable = table_ instanceof FeKuduTable;
Set<String> kuduPartitionColumnNames = null;
if (isKuduTable) {
kuduPartitionColumnNames = getKuduPartitionColumnNames((FeKuduTable) table_);
}
IcebergPartitionSpec icebergPartSpec = null;
if (isIcebergTarget()) {
icebergPartSpec = ((FeIcebergTable)table_).getDefaultPartitionSpec();
}
SetOperationStmt unionStmt =
(queryStmt_ instanceof SetOperationStmt) ? (SetOperationStmt) queryStmt_ : null;
List<Expr> widestTypeExprList = null;
if (unionStmt != null && unionStmt.getWidestExprs() != null
&& unionStmt.getWidestExprs().size() > 0) {
widestTypeExprList = unionStmt.getWidestExprs();
}
boolean convertToUtc =
isKuduTable && analyzer.getQueryOptions().isWrite_kudu_utc_timestamps();
// Check dynamic partition columns for type compatibility.
for (int i = 0; i < selectListExprs.size(); ++i) {
Column targetColumn = selectExprTargetColumns.get(i);
// widestTypeExpr is widest type expression for column i
Expr widestTypeExpr =
(widestTypeExprList != null) ? widestTypeExprList.get(i) : null;
Expr compatibleExpr = checkTypeCompatibility(targetTableName_.toString(),
targetColumn, selectListExprs.get(i), analyzer, widestTypeExpr);
if (targetColumn.getPosition() < numClusteringCols) {
// This is a dynamic clustering column
tmpPartitionKeyExprs.add(compatibleExpr);
tmpPartitionKeyNames.add(targetColumn.getName());
} else if (isKuduTable) {
if (kuduPartitionColumnNames.contains(targetColumn.getName())) {
tmpPartitionKeyExprs.add(compatibleExpr);
tmpPartitionKeyNames.add(targetColumn.getName());
}
}
selectListExprs.set(i, compatibleExpr);
}
// Check static partition columns, dynamic entries in partitionKeyValues will already
// be in selectExprTargetColumns and therefore are ignored in this loop
if (partitionKeyValues_ != null) {
for (PartitionKeyValue pkv: partitionKeyValues_) {
if (pkv.isStatic()) {
// tableColumns is guaranteed to exist after the earlier analysis checks
Column tableColumn = table_.getColumn(pkv.getColName());
Expr compatibleExpr = checkTypeCompatibility(targetTableName_.toString(),
tableColumn, pkv.getLiteralValue(), analyzer, null);
tmpPartitionKeyExprs.add(compatibleExpr);
tmpPartitionKeyNames.add(pkv.getColName());
}
}
}
if (isIcebergTarget()) {
// Add partition key expressions in the order of the Iceberg partition fields.
IcebergUtil.populatePartitionExprs(
analyzer, widestTypeExprList, selectExprTargetColumns, selectListExprs,
(FeIcebergTable)table_, partitionKeyExprs_, partitionColPos_);
} else {
// Reorder the partition key exprs and names to be consistent with the target table
// declaration, and store their column positions. We need those exprs in the
// original order to create the corresponding Hdfs folder structure correctly, or
// the indexes to construct rows to pass to the Kudu partitioning API.
for (int i = 0; i < table_.getColumns().size(); ++i) {
Column c = table_.getColumns().get(i);
for (int j = 0; j < tmpPartitionKeyNames.size(); ++j) {
if (c.getName().equals(tmpPartitionKeyNames.get(j))) {
Expr expr = tmpPartitionKeyExprs.get(j);
if (convertToUtc && expr.getType().isTimestamp()) {
expr = ExprUtil.toUtcTimestampExpr(
analyzer, expr, true /*expectPreIfNonUnique*/);
}
partitionKeyExprs_.add(expr);
partitionColPos_.add(i);
break;
}
}
}
}
if (isIcebergTarget() && icebergPartSpec.hasPartitionFields()) {
int parts = 0;
for (IcebergPartitionField pField : icebergPartSpec.getIcebergPartitionFields()) {
if (pField.getTransformType() != TIcebergPartitionTransformType.VOID) {
++parts;
}
}
if (CollectionUtils.isEmpty(columnPermutation_)) {
Preconditions.checkState(partitionKeyExprs_.size() == parts);
}
}
else if (isKuduTable) {
Preconditions.checkState(
partitionKeyExprs_.size() == kuduPartitionColumnNames.size());
} else {
Preconditions.checkState(partitionKeyExprs_.size() == numClusteringCols);
}
// Make sure we have stats for partitionKeyExprs
for (Expr expr: partitionKeyExprs_) {
expr.analyze(analyzer);
}
// Finally, 'undo' the permutation so that the selectListExprs are in Hive column
// order, and add NULL expressions to all missing columns, unless this is an UPSERT.
List<Column> columns = table_.getColumnsInHiveOrder();
for (int col = 0; col < columns.size(); ++col) {
Column tblColumn = columns.get(col);
boolean matchFound = false;
for (int i = 0; i < selectListExprs.size(); ++i) {
if (selectExprTargetColumns.get(i).getName().equals(tblColumn.getName())) {
Expr expr = selectListExprs.get(i);
if (convertToUtc && expr.getType().isTimestamp()) {
expr = ExprUtil.toUtcTimestampExpr(
analyzer, expr, true /*expectPreIfNonUnique*/);
}
resultExprs_.add(expr);
if (isKuduTable) mentionedColumns_.add(col);
matchFound = true;
break;
}
}
// If no match is found, either the column is a clustering column with a static
// value, or it was unmentioned and therefore should have a NULL select-list
// expression if this is an INSERT and the target is not a Kudu table.
if (!matchFound) {
if (tblColumn.getPosition() >= numClusteringCols) {
if (isKuduTable) {
Preconditions.checkState(tblColumn instanceof KuduColumn);
KuduColumn kuduCol = (KuduColumn) tblColumn;
if (!kuduCol.hasDefaultValue() && !kuduCol.isNullable()
&& !kuduCol.isAutoIncrementing()) {
throw new AnalysisException("Missing values for column that is not " +
"nullable and has no default value " + kuduCol.getName());
}
} else {
// Unmentioned non-clustering columns get NULL literals with the appropriate
// target type because Parquet cannot handle NULL_TYPE (IMPALA-617).
NullLiteral nullExpr = NullLiteral.create(tblColumn.getType());
resultExprs_.add(nullExpr);
// In the case of INSERT INTO iceberg_tbl (col_a, col_b, ...), if the
// partition columns are not in the columnPermutation_, we should fill it
// with NullLiteral to partitionKeyExprs_ (IMPALA-11408).
if (isIcebergTarget() && !CollectionUtils.isEmpty(columnPermutation_)
&& icebergPartSpec != null) {
IcebergColumn targetColumn = (IcebergColumn) tblColumn;
if (IcebergUtil.isPartitionColumn(targetColumn, icebergPartSpec)) {
partitionKeyExprs_.add(nullExpr);
partitionColPos_.add(targetColumn.getPosition());
}
}
}
}
}
// Store exprs for Kudu key columns.
if (matchFound && isKuduTable) {
FeKuduTable kuduTable = (FeKuduTable) table_;
if (kuduTable.getPrimaryKeyColumnNames().contains(tblColumn.getName())) {
primaryKeyExprs_.add(Iterables.getLast(resultExprs_));
}
}
}
// In the case of INSERT INTO iceberg_tbl (col_a, col_b, ...), to ensure that data is
// written to the correct partition, we need to make sure that the partitionKeyExprs_
// is in ascending order according to the column position of the Iceberg tables.
if (isIcebergTarget() && !CollectionUtils.isEmpty(columnPermutation_)) {
List<Pair<Integer, Expr>> exprPairs = Lists.newArrayList();
for (int i = 0; i < partitionColPos_.size(); i++) {
exprPairs.add(Pair.create(partitionColPos_.get(i), partitionKeyExprs_.get(i)));
}
exprPairs.sort(Comparator.comparingInt(p -> p.first));
partitionColPos_.clear();
partitionKeyExprs_.clear();
for (Pair<Integer, Expr> exprPair : exprPairs) {
partitionColPos_.add(exprPair.first);
partitionKeyExprs_.add(exprPair.second);
}
}
if (table_ instanceof FeKuduTable) {
Preconditions.checkState(!primaryKeyExprs_.isEmpty());
}
// TODO: Check that HBase row-key columns are not NULL? See IMPALA-406
if (needsGeneratedQueryStatement_) {
// Build a query statement that returns NULL for every column
List<SelectListItem> selectListItems = new ArrayList<>();
for(Expr e: resultExprs_) {
selectListItems.add(new SelectListItem(e, null));
}
SelectList selectList = new SelectList(selectListItems);
queryStmt_ = new SelectStmt(selectList, null, null, null, null, null, null);
queryStmt_.analyze(analyzer);
}
}