private void prepareExpressions()

in fe/src/main/java/org/apache/impala/analysis/InsertStmt.java [829:1037]


  private void prepareExpressions(List<Column> selectExprTargetColumns,
      List<Expr> selectListExprs, FeTable tbl, Analyzer analyzer)
      throws AnalysisException {
    // Temporary lists of partition key exprs and names in an arbitrary order.
    List<Expr> tmpPartitionKeyExprs = new ArrayList<>();
    List<String> tmpPartitionKeyNames = new ArrayList<>();

    int numClusteringCols = (tbl instanceof FeHBaseTable) ? 0
        : tbl.getNumClusteringCols();
    boolean isKuduTable = table_ instanceof FeKuduTable;
    Set<String> kuduPartitionColumnNames = null;
    if (isKuduTable) {
      kuduPartitionColumnNames = getKuduPartitionColumnNames((FeKuduTable) table_);
    }
    IcebergPartitionSpec icebergPartSpec = null;
    if (isIcebergTarget()) {
      icebergPartSpec = ((FeIcebergTable)table_).getDefaultPartitionSpec();
    }

    SetOperationStmt unionStmt =
        (queryStmt_ instanceof SetOperationStmt) ? (SetOperationStmt) queryStmt_ : null;
    List<Expr> widestTypeExprList = null;
    if (unionStmt != null && unionStmt.getWidestExprs() != null
        && unionStmt.getWidestExprs().size() > 0) {
      widestTypeExprList = unionStmt.getWidestExprs();
    }

    boolean convertToUtc =
        isKuduTable && analyzer.getQueryOptions().isWrite_kudu_utc_timestamps();

    // Check dynamic partition columns for type compatibility.
    for (int i = 0; i < selectListExprs.size(); ++i) {
      Column targetColumn = selectExprTargetColumns.get(i);
      // widestTypeExpr is widest type expression for column i
      Expr widestTypeExpr =
          (widestTypeExprList != null) ? widestTypeExprList.get(i) : null;
      Expr compatibleExpr = checkTypeCompatibility(targetTableName_.toString(),
          targetColumn, selectListExprs.get(i), analyzer, widestTypeExpr);
      if (targetColumn.getPosition() < numClusteringCols) {
        // This is a dynamic clustering column
        tmpPartitionKeyExprs.add(compatibleExpr);
        tmpPartitionKeyNames.add(targetColumn.getName());
      } else if (isKuduTable) {
        if (kuduPartitionColumnNames.contains(targetColumn.getName())) {
          tmpPartitionKeyExprs.add(compatibleExpr);
          tmpPartitionKeyNames.add(targetColumn.getName());
        }
      }
      selectListExprs.set(i, compatibleExpr);
    }

    // Check static partition columns, dynamic entries in partitionKeyValues will already
    // be in selectExprTargetColumns and therefore are ignored in this loop
    if (partitionKeyValues_ != null) {
      for (PartitionKeyValue pkv: partitionKeyValues_) {
        if (pkv.isStatic()) {
          // tableColumns is guaranteed to exist after the earlier analysis checks
          Column tableColumn = table_.getColumn(pkv.getColName());
          Expr compatibleExpr = checkTypeCompatibility(targetTableName_.toString(),
              tableColumn, pkv.getLiteralValue(), analyzer, null);
          tmpPartitionKeyExprs.add(compatibleExpr);
          tmpPartitionKeyNames.add(pkv.getColName());
        }
      }
    }

    if (isIcebergTarget()) {
      // Add partition key expressions in the order of the Iceberg partition fields.
      IcebergUtil.populatePartitionExprs(
          analyzer, widestTypeExprList, selectExprTargetColumns, selectListExprs,
          (FeIcebergTable)table_, partitionKeyExprs_, partitionColPos_);
    } else {
      // Reorder the partition key exprs and names to be consistent with the target table
      // declaration, and store their column positions.  We need those exprs in the
      // original order to create the corresponding Hdfs folder structure correctly, or
      // the indexes to construct rows to pass to the Kudu partitioning API.
      for (int i = 0; i < table_.getColumns().size(); ++i) {
        Column c = table_.getColumns().get(i);
        for (int j = 0; j < tmpPartitionKeyNames.size(); ++j) {
          if (c.getName().equals(tmpPartitionKeyNames.get(j))) {
            Expr expr = tmpPartitionKeyExprs.get(j);
            if (convertToUtc && expr.getType().isTimestamp()) {
              expr = ExprUtil.toUtcTimestampExpr(
                  analyzer, expr, true /*expectPreIfNonUnique*/);
            }
            partitionKeyExprs_.add(expr);
            partitionColPos_.add(i);
            break;
          }
        }
      }
    }

    if (isIcebergTarget() && icebergPartSpec.hasPartitionFields()) {
      int parts = 0;
      for (IcebergPartitionField pField : icebergPartSpec.getIcebergPartitionFields()) {
        if (pField.getTransformType() != TIcebergPartitionTransformType.VOID) {
          ++parts;
        }
      }
      if (CollectionUtils.isEmpty(columnPermutation_)) {
        Preconditions.checkState(partitionKeyExprs_.size() == parts);
      }
    }
    else if (isKuduTable) {
      Preconditions.checkState(
          partitionKeyExprs_.size() == kuduPartitionColumnNames.size());
    } else {
      Preconditions.checkState(partitionKeyExprs_.size() == numClusteringCols);
    }

    // Make sure we have stats for partitionKeyExprs
    for (Expr expr: partitionKeyExprs_) {
      expr.analyze(analyzer);
    }

    // Finally, 'undo' the permutation so that the selectListExprs are in Hive column
    // order, and add NULL expressions to all missing columns, unless this is an UPSERT.
    List<Column> columns = table_.getColumnsInHiveOrder();
    for (int col = 0; col < columns.size(); ++col) {
      Column tblColumn = columns.get(col);
      boolean matchFound = false;
      for (int i = 0; i < selectListExprs.size(); ++i) {
        if (selectExprTargetColumns.get(i).getName().equals(tblColumn.getName())) {
          Expr expr = selectListExprs.get(i);
          if (convertToUtc && expr.getType().isTimestamp()) {
            expr = ExprUtil.toUtcTimestampExpr(
                analyzer, expr, true /*expectPreIfNonUnique*/);
          }
          resultExprs_.add(expr);
          if (isKuduTable) mentionedColumns_.add(col);
          matchFound = true;
          break;
        }
      }
      // If no match is found, either the column is a clustering column with a static
      // value, or it was unmentioned and therefore should have a NULL select-list
      // expression if this is an INSERT and the target is not a Kudu table.
      if (!matchFound) {
        if (tblColumn.getPosition() >= numClusteringCols) {
          if (isKuduTable) {
            Preconditions.checkState(tblColumn instanceof KuduColumn);
            KuduColumn kuduCol = (KuduColumn) tblColumn;
            if (!kuduCol.hasDefaultValue() && !kuduCol.isNullable()
                && !kuduCol.isAutoIncrementing()) {
              throw new AnalysisException("Missing values for column that is not " +
                  "nullable and has no default value " + kuduCol.getName());
            }
          } else {
            // Unmentioned non-clustering columns get NULL literals with the appropriate
            // target type because Parquet cannot handle NULL_TYPE (IMPALA-617).
            NullLiteral nullExpr = NullLiteral.create(tblColumn.getType());
            resultExprs_.add(nullExpr);
            // In the case of INSERT INTO iceberg_tbl (col_a, col_b, ...), if the
            // partition columns are not in the columnPermutation_, we should fill it
            // with NullLiteral to partitionKeyExprs_ (IMPALA-11408).
            if (isIcebergTarget() && !CollectionUtils.isEmpty(columnPermutation_)
                && icebergPartSpec != null) {
              IcebergColumn targetColumn = (IcebergColumn) tblColumn;
              if (IcebergUtil.isPartitionColumn(targetColumn, icebergPartSpec)) {
                partitionKeyExprs_.add(nullExpr);
                partitionColPos_.add(targetColumn.getPosition());
              }
            }
          }
        }
      }
      // Store exprs for Kudu key columns.
      if (matchFound && isKuduTable) {
        FeKuduTable kuduTable = (FeKuduTable) table_;
        if (kuduTable.getPrimaryKeyColumnNames().contains(tblColumn.getName())) {
          primaryKeyExprs_.add(Iterables.getLast(resultExprs_));
        }
      }
    }

    // In the case of INSERT INTO iceberg_tbl (col_a, col_b, ...), to ensure that data is
    // written to the correct partition, we need to make sure that the partitionKeyExprs_
    // is in ascending order according to the column position of the Iceberg tables.
    if (isIcebergTarget() && !CollectionUtils.isEmpty(columnPermutation_)) {
      List<Pair<Integer, Expr>> exprPairs = Lists.newArrayList();
      for (int i = 0; i < partitionColPos_.size(); i++) {
        exprPairs.add(Pair.create(partitionColPos_.get(i), partitionKeyExprs_.get(i)));
      }
      exprPairs.sort(Comparator.comparingInt(p -> p.first));
      partitionColPos_.clear();
      partitionKeyExprs_.clear();
      for (Pair<Integer, Expr> exprPair : exprPairs) {
        partitionColPos_.add(exprPair.first);
        partitionKeyExprs_.add(exprPair.second);
      }
    }

    if (table_ instanceof FeKuduTable) {
      Preconditions.checkState(!primaryKeyExprs_.isEmpty());
    }

    // TODO: Check that HBase row-key columns are not NULL? See IMPALA-406
    if (needsGeneratedQueryStatement_) {
      // Build a query statement that returns NULL for every column
      List<SelectListItem> selectListItems = new ArrayList<>();
      for(Expr e: resultExprs_) {
        selectListItems.add(new SelectListItem(e, null));
      }
      SelectList selectList = new SelectList(selectListItems);
      queryStmt_ = new SelectStmt(selectList, null, null, null, null, null, null);
      queryStmt_.analyze(analyzer);
    }
  }