private TExecRequest doCreateExecRequest()

in fe/src/main/java/org/apache/impala/service/Frontend.java [2884:3165]


  private TExecRequest doCreateExecRequest(CompilerFactory compilerFactory,
      PlanCtx planCtx, List<String> warnings, EventSequence timeline)
      throws ImpalaException {
    TQueryCtx queryCtx = planCtx.getQueryContext();

    // Parse stmt and collect/load metadata to populate a stmt-local table cache
    ParsedStatement parsedStmt = compilerFactory.createParsedStatement(queryCtx);

    User user = new User(TSessionStateUtil.getEffectiveUser(queryCtx.session));
    StmtMetadataLoader metadataLoader = new StmtMetadataLoader(
        this, queryCtx.session.database, timeline, user, queryCtx.getQuery_id());

    //TODO (IMPALA-8788): should load table write ids in transaction context.
    StmtTableCache stmtTableCache = metadataLoader.loadTables(parsedStmt);
    if (queryCtx.client_request.query_options.isSetDebug_action()) {
        DebugUtils.executeDebugAction(
            queryCtx.client_request.query_options.getDebug_action(),
            DebugUtils.LOAD_TABLES_DELAY);
    }

    // Add referenced tables to frontend profile
    FrontendProfile.getCurrent().addInfoString("Referenced Tables",
        stmtTableCache.tables.keySet()
            .stream()
            .map(TableName::toString)
            .collect(Collectors.joining(", ")));

    // Add the catalog versions and loaded timestamps.
    FrontendProfile.getCurrent().addInfoString("Original Table Versions",
        stmtTableCache.tables.values().stream()
            .map(t -> String.join(", ", t.getFullName(),
                Long.toString(t.getCatalogVersion()),
                Long.toString(t.getLastLoadedTimeMs()),
                new Date(t.getLastLoadedTimeMs()).toString()))
            .collect(Collectors.joining("\n")));

    // Analyze and authorize stmt
    AnalysisContext analysisCtx = new AnalysisContext(queryCtx, authzFactory_, timeline);
    AnalysisResult analysisResult = analysisCtx.analyzeAndAuthorize(compilerFactory,
        parsedStmt, stmtTableCache, authzChecker_.get(),
        planCtx.compilationState_.disableAuthorization());
    // need to re-fetch the parsedStatement because analysisResult can rewrite the
    // statement.
    parsedStmt = analysisResult.getParsedStmt();

    Preconditions.checkState(analysisResult.getException() == null);
    if (!planCtx.compilationState_.disableAuthorization()) {
      LOG.info("Analysis and authorization finished.");
      planCtx.compilationState_.setUserHasProfileAccess(
          analysisResult.userHasProfileAccess());
    } else {
      LOG.info("Analysis finished.");
    }
    analysisResult.getAnalyzer().addWarnings(warnings);
    TExecRequest result = createBaseExecRequest(queryCtx, analysisResult);
    for (TableName table : stmtTableCache.tables.keySet()) {
      result.addToTables(table.toThrift());
    }
    for (String column : analysisResult.getAnalyzer().selectColumns()) {
      result.addToSelect_columns(column);
    }
    for (String column : analysisResult.getAnalyzer().whereColumns()) {
      result.addToWhere_columns(column);
    }
    for (String column : analysisResult.getAnalyzer().aggregateColumns()) {
      result.addToAggregate_columns(column);
    }
    for (String column : analysisResult.getAnalyzer().orderByColumns()) {
      result.addToOrderby_columns(column);
    }

    // Transfer the expected number of executors in executor group set to
    // analyzer's global state. The info is needed to compute the number of nodes to be
    // used during planner phase for scans (see HdfsScanNode.computeNumNodes()).
    analysisResult.getAnalyzer().setNumExecutorsForPlanning(
        expectedNumExecutor(planCtx.compilationState_.getGroupSet()));
    analysisResult.getAnalyzer().setAvailableCoresPerNode(
        Math.max(1, planCtx.compilationState_.getAvailableCoresPerNode()));
    analysisResult.getAnalyzer().setPoolMemLimit(planCtx.compilationState_.getGroupSet());

    try {
      TQueryOptions queryOptions = queryCtx.client_request.query_options;
      if (analysisResult.isCatalogOp()) {
        result.stmt_type = TStmtType.DDL;
        createCatalogOpRequest(analysisResult, result);
        TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
        if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
          result.catalog_op_request.setLineage_graph(thriftLineageGraph);
        }
        setMtDopForCatalogOp(analysisResult, queryOptions);
        // All DDL operations except for CTAS are done with analysis at this point.
        if (!analysisResult.isCreateTableAsSelectStmt()) {
          return result;
        }
      }
      if (!analysisResult.isExplainStmt() &&
          (analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt())) {
        InsertStmt insertStmt = analysisResult.getInsertStmt();
        FeTable targetTable = insertStmt.getTargetTable();
        if (AcidUtils.isTransactionalTable(targetTable)) {

          if (planCtx.compilationState_.getWriteId() == -1) {
            // 1st time compilation. Open a transaction and save the writeId.
            long txnId = openTransaction(queryCtx);
            timeline.markEvent("Transaction opened (" + String.valueOf(txnId) + ")");
            Collection<FeTable> tables = stmtTableCache.tables.values();
            String staticPartitionTarget = null;
            if (insertStmt.isStaticPartitionTarget()) {
              staticPartitionTarget = FeCatalogUtils.getPartitionName(
                  insertStmt.getPartitionKeyValues());
            }
            long writeId = allocateWriteId(queryCtx, targetTable);
            insertStmt.setWriteId(writeId);
            createLockForInsert(txnId, tables, targetTable, insertStmt.isOverwrite(),
                staticPartitionTarget, queryOptions);

            planCtx.compilationState_.setWriteId(writeId);
          } else {
            // Continue the transaction by reusing the writeId.
            insertStmt.setWriteId(planCtx.compilationState_.getWriteId());
          }
        }
      } else if (analysisResult.isLoadDataStmt()) {
        result.stmt_type = TStmtType.LOAD;
        result.setResult_set_metadata(new TResultSetMetadata(
            Collections.singletonList(new TColumn("summary", Type.STRING.toThrift()))));
        result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
        return result;
      } else if (analysisResult.isSetStmt()) {
        result.stmt_type = TStmtType.SET;
        result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
            new TColumn("option", Type.STRING.toThrift()),
            new TColumn("value", Type.STRING.toThrift()),
            new TColumn("level", Type.STRING.toThrift()))));
        result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
        return result;
      } else if (analysisResult.isAdminFnStmt()) {
        result.stmt_type = TStmtType.ADMIN_FN;
        result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
            new TColumn("summary", Type.STRING.toThrift()))));
        result.setAdmin_request(analysisResult.getAdminFnStmt().toThrift());
        return result;
      } else if (analysisResult.isConvertTableToIcebergStmt()) {
        result.stmt_type = TStmtType.CONVERT;
        result.setResult_set_metadata(new TResultSetMetadata(
            Collections.singletonList(new TColumn("summary", Type.STRING.toThrift()))));
        result.setConvert_table_request(
            analysisResult.getConvertTableToIcebergStmt().toThrift());
        return result;
      } else if (analysisResult.isTestCaseStmt()) {
        CopyTestCaseStmt testCaseStmt = ((CopyTestCaseStmt) analysisResult.getStmt());
        if (testCaseStmt.isTestCaseExport()) {
          result.setStmt_type(TStmtType.TESTCASE);
          result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
            new TColumn("Test case data output path", Type.STRING.toThrift()))));
          result.setTestcase_data_path(testCaseStmt.writeTestCaseData());
        } else {
          // Mimic it as a DDL.
          result.setStmt_type(TStmtType.DDL);
          createCatalogOpRequest(analysisResult, result);
        }
        return result;
      } else if (analysisResult.isKillQueryStmt()) {
        result.stmt_type = TStmtType.KILL;
        result.setResult_set_metadata(new TResultSetMetadata(
            Collections.singletonList(new TColumn("result", Type.STRING.toThrift()))));
        result.setKill_query_request(analysisResult.getKillQueryStmt().toThrift());
        return result;
      }

      // Open or continue Kudu transaction if Kudu transaction is enabled and target table
      // is Kudu table.
      if (!analysisResult.isExplainStmt() && queryOptions.isEnable_kudu_transaction()) {
        if ((analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt())
            && analysisResult.getInsertStmt().isTargetTableKuduTable()) {
          // For INSERT/UPSERT/CTAS statements.
          openOrContinueKuduTransaction(planCtx, queryCtx, analysisResult,
              analysisResult.getInsertStmt().getTargetTable(), timeline);
        } else if (analysisResult.isUpdateStmt()
            && analysisResult.getUpdateStmt().isTargetTableKuduTable()) {
          // For UPDATE statement.
          openOrContinueKuduTransaction(planCtx, queryCtx, analysisResult,
              analysisResult.getUpdateStmt().getTargetTable(), timeline);
        } else if (analysisResult.isDeleteStmt()
            && analysisResult.getDeleteStmt().isTargetTableKuduTable()) {
          // For DELETE statement.
          openOrContinueKuduTransaction(planCtx, queryCtx, analysisResult,
              analysisResult.getDeleteStmt().getTargetTable(), timeline);
        }
      }

      // If unset, set MT_DOP to 0 to simplify the rest of the code.
      if (!queryOptions.isSetMt_dop()) queryOptions.setMt_dop(0);

      Planner planner = new Planner(compilerFactory, analysisResult, queryCtx, timeline);
      // create TQueryExecRequest
      TQueryExecRequest queryExecRequest =
          getPlannedExecRequest(planner, queryCtx, planCtx, analysisResult, timeline);

      for (String column : analysisResult.getAnalyzer().joinColumns()) {
        result.addToJoin_columns(column);
      }

      TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
      if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
        queryExecRequest.setLineage_graph(thriftLineageGraph);
      }

      // Override the per_host_mem_estimate sent to the backend if needed. The explain
      // string is already generated at this point so this does not change the estimate
      // shown in the plan.
      checkAndOverrideMemEstimate(queryExecRequest, queryOptions);

      if (analysisResult.isExplainStmt()) {
        // Return the EXPLAIN request
        createExplainRequest(planCtx.getExplainString(), result);
        return result;
      }

      result.setQuery_exec_request(queryExecRequest);
      if (analysisResult.isQueryStmt()) {
        result.stmt_type = TStmtType.QUERY;
        result.query_exec_request.stmt_type = result.stmt_type;
        // use the parsed statement from the analysis result because the
        // rewriter may have changed the statement.
        ParsedStatement analyzedStmt = analysisResult.getParsedStmt();
        // fill in the metadata
        result.setResult_set_metadata(
            planner.getTResultSetMetadata(analyzedStmt));
      } else if (analysisResult.isInsertStmt() ||
          analysisResult.isCreateTableAsSelectStmt()) {
        // For CTAS the overall TExecRequest statement type is DDL, but the
        // query_exec_request should be DML
        result.stmt_type =
            analysisResult.isCreateTableAsSelectStmt() ? TStmtType.DDL : TStmtType.DML;
        result.query_exec_request.stmt_type = TStmtType.DML;
        // create finalization params of insert stmt
        addFinalizationParamsForInsert(
            queryCtx, queryExecRequest, analysisResult.getInsertStmt());
      } else {
        Preconditions.checkState(
            analysisResult.isUpdateStmt() || analysisResult.isDeleteStmt() ||
                analysisResult.isOptimizeStmt() || analysisResult.isMergeStmt());
        result.stmt_type = TStmtType.DML;
        result.query_exec_request.stmt_type = TStmtType.DML;
        if (analysisResult.isDeleteStmt()) {
          addFinalizationParamsForIcebergModify(queryCtx, queryExecRequest,
              analysisResult.getDeleteStmt(), TIcebergOperation.DELETE);
        } else if (analysisResult.isUpdateStmt()) {
          addFinalizationParamsForIcebergModify(queryCtx, queryExecRequest,
              analysisResult.getUpdateStmt(), TIcebergOperation.UPDATE);
        } else if (analysisResult.isOptimizeStmt()) {
          addFinalizationParamsForIcebergOptimize(queryCtx, queryExecRequest,
              analysisResult.getOptimizeStmt());
        } else if (analysisResult.isMergeStmt()) {
          addFinalizationParamsForIcebergModify(queryCtx, queryExecRequest,
              analysisResult.getMergeStmt(), TIcebergOperation.MERGE);
        }
      }
      return result;
    } catch (Exception e) {
      if (queryCtx.isSetTransaction_id()) {
        try {
          planCtx.compilationState_.setWriteId(-1);
          abortTransaction(queryCtx.getTransaction_id());
          timeline.markEvent("Transaction aborted");
        } catch (TransactionException te) {
          LOG.error("Could not abort transaction because: " + te.getMessage());
        }
      } else if (queryCtx.isIs_kudu_transactional()) {
        try {
          planCtx.compilationState_.setKuduTransactionToken(null);
          abortKuduTransaction(queryCtx.getQuery_id());
          timeline.markEvent(
              "Kudu transaction aborted: " + PrintId(queryCtx.getQuery_id()));
        } catch (TransactionException te) {
          LOG.error("Could not abort transaction because: " + te.getMessage());
        }
      }
      throw e;
    }
  }