absl::StatusOr QueryEngine::ExecuteSql()

in backend/query/query_engine.cc [973:1085]


absl::StatusOr<QueryResult> QueryEngine::ExecuteSql(
    const Query& query, const QueryContext& context,
    v1::ExecuteSqlRequest_QueryMode query_mode) const {
  absl::Time start_time = absl::Now();

  ZETASQL_ASSIGN_OR_RETURN(auto analyzer_options,
                   MakeAnalyzerOptionsWithParameters(
                       query.declared_params,
                       GetTimeZone(function_catalog_.GetLatestSchema())));
  analyzer_options.set_prune_unused_columns(true);

  QueryEvaluatorForEngine view_evaluator(*this, context);
  Catalog catalog{
      context.schema,
      &function_catalog_,
      type_factory_,
      analyzer_options,
      context.reader,
      &view_evaluator,
      query.change_stream_internal_lookup,
  };

  std::unique_ptr<const zetasql::AnalyzerOutput> analyzer_output;
  if (context.schema->dialect() == database_api::DatabaseDialect::POSTGRESQL &&
      !query.change_stream_internal_lookup.has_value()) {
    ZETASQL_ASSIGN_OR_RETURN(analyzer_output,
                     AnalyzePostgreSQL(query.sql, &catalog, analyzer_options,
                                       type_factory_, &function_catalog_));

  } else {
    ZETASQL_ASSIGN_OR_RETURN(analyzer_output, Analyze(query.sql, &catalog,
                                              analyzer_options, type_factory_));
  }

  ZETASQL_ASSIGN_OR_RETURN(auto params,
                   ExtractParameters(query, analyzer_output.get()));

  ZETASQL_ASSIGN_OR_RETURN(auto resolved_statement,
                   ExtractValidatedResolvedStatementAndOptions(
                       analyzer_output.get(), context));

  // Change stream queries are not directly executed via this generic ExecuteSql
  // function in query engine. If a change stream query reaches here, it is from
  // an incorrect API(only ExecuteStreamingSql is allowed).
  ChangeStreamQueryValidator validator{
      context.schema, start_time,
      absl::flat_hash_map<std::string, zetasql::Value>(params.begin(),
                                                         params.end())};
  ZETASQL_ASSIGN_OR_RETURN(auto is_change_stream,
                   validator.IsChangeStreamQuery(resolved_statement.get()));
  if (is_change_stream) {
    return error::ChangeStreamQueriesMustBeStreaming();
  }

  QueryResult result;
  if (!IsDMLStmt(analyzer_output->resolved_statement()->node_kind())) {
    ZETASQL_ASSIGN_OR_RETURN(
        auto cursor,
        EvaluateQuery(resolved_statement.get(), params, type_factory_,
                      &result.num_output_rows, query_mode,
                      GetTimeZone(function_catalog_.GetLatestSchema())));
    result.rows = std::move(cursor);
  } else {
    ZETASQL_RET_CHECK_NE(context.writer, nullptr);
    analyzer_options.set_prune_unused_columns(false);
    if (context.schema->dialect() ==
        database_api::DatabaseDialect::POSTGRESQL) {
      ZETASQL_ASSIGN_OR_RETURN(analyzer_output,
                       AnalyzePostgreSQL(query.sql, &catalog, analyzer_options,
                                         type_factory_, &function_catalog_));
    } else {
      ZETASQL_ASSIGN_OR_RETURN(
          analyzer_output,
          Analyze(query.sql, &catalog, analyzer_options, type_factory_));
    }
    ZETASQL_ASSIGN_OR_RETURN(resolved_statement,
                     ExtractValidatedResolvedStatementAndOptions(
                         analyzer_output.get(), context));

    // Only execute the SQL statement if the user did not request PLAN mode.
    if (query_mode != v1::ExecuteSqlRequest::PLAN) {
      ZETASQL_ASSIGN_OR_RETURN(
          auto execute_update_result,
          EvaluateUpdate(resolved_statement.get(), &catalog, params,
                         type_factory_, context.schema->dialect(),
                         context.schema,
                         GetTimeZone(function_catalog_.GetLatestSchema())));
      ZETASQL_RETURN_IF_ERROR(context.writer->Write(execute_update_result.mutation));
      result.modified_row_count = execute_update_result.modify_row_count;
      result.rows = std::move(execute_update_result.returning_row_cursor);
    } else {
      // Add the columns and types of the returning clause to the result.
      auto returning_clause = GetReturningClause(resolved_statement.get());
      if (returning_clause != nullptr) {
        std::vector<std::string> names;
        std::vector<const zetasql::Type*> types;
        for (auto& column : returning_clause->output_column_list()) {
          names.push_back(column->column().name());
          types.push_back(column->column().type());
        }
        std::vector<std::vector<zetasql::Value>> values;
        result.rows = std::make_unique<VectorsRowCursor>(names, types, values);
      }
    }
  }
  // Add both undeclared and declared parameters to the result.
  result.parameter_types = analyzer_output->undeclared_parameters();
  for (auto const& param : query.declared_params) {
    result.parameter_types.insert({param.first, param.second.type()});
  }
  result.elapsed_time = absl::Now() - start_time;
  return result;
}