be/src/service/client-request-state.cc (2,145 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "service/client-request-state.h" #include <optional> #include <boost/algorithm/string/join.hpp> #include <boost/algorithm/string/predicate.hpp> #include <boost/algorithm/string/replace.hpp> #include <limits> #include <gutil/strings/substitute.h> #include <rapidjson/rapidjson.h> #include <rapidjson/stringbuffer.h> #include <rapidjson/writer.h> #include "catalog/catalog-service-client-wrapper.h" #include "common/status.h" #include "exprs/timezone_db.h" #include "gen-cpp/Types_types.h" #include "kudu/rpc/rpc_controller.h" #include "rpc/rpc-mgr.inline.h" #include "runtime/coordinator.h" #include "runtime/exec-env.h" #include "runtime/mem-tracker.h" #include "runtime/query-driver.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "runtime/timestamp-value.h" #include "runtime/timestamp-value.inline.h" #include "scheduling/admission-control-client.h" #include "scheduling/cluster-membership-mgr.h" #include "scheduling/scheduler.h" #include "service/frontend.h" #include "service/impala-server.h" #include "service/query-options.h" #include "service/query-result-set.h" #include "util/auth-util.h" #include "util/debug-util.h" #include "util/impalad-metrics.h" #include "util/lineage-util.h" #include "util/pretty-printer.h" #include "util/redactor.h" #include "util/runtime-profile.h" #include "util/runtime-profile-counters.h" #include "util/time.h" #include "util/uid-util.h" #include "gen-cpp/CatalogService_types.h" #include "gen-cpp/control_service.pb.h" #include "gen-cpp/control_service.proxy.h" #include <thrift/Thrift.h> #include "common/names.h" #include "control-service.h" using boost::algorithm::iequals; using boost::algorithm::join; using boost::algorithm::replace_all_copy; using kudu::rpc::RpcController; using namespace apache::hive::service::cli::thrift; using namespace apache::thrift; using namespace beeswax; using namespace strings; DECLARE_bool(abort_on_failed_audit_event); DECLARE_bool(abort_on_failed_lineage_event); DECLARE_int32(krpc_port); DECLARE_int64(max_result_cache_size); DECLARE_bool(use_local_catalog); namespace impala { PROFILE_DEFINE_TIMER(ClientFetchLockWaitTimer, UNSTABLE, "Cumulative time client fetch requests waiting for locks."); PROFILE_DEFINE_SUMMARY_STATS_TIMER(GetInFlightProfileTimeStats, UNSTABLE, "Summary stats of the time dumping profiles when the query is still in-flight."); // Keys into the info string map of the runtime profile referring to specific // items used by CM for monitoring purposes. static const string PER_HOST_MEM_KEY = "Estimated Per-Host Mem"; static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats"; static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table Stats"; static const string TABLES_WITH_MISSING_DISK_IDS_KEY = "Tables With Missing Disk Ids"; static const string QUERY_STATUS_KEY = "Query Status"; static const string RETRY_STATUS_KEY = "Retry Status"; const TExecRequest ClientRequestState::unknown_exec_request_; ClientRequestState::ClientRequestState(const TQueryCtx& query_ctx, Frontend* frontend, ImpalaServer* server, shared_ptr<ImpalaServer::SessionState> session, QueryDriver* query_driver) : query_ctx_(query_ctx), last_active_time_ms_(numeric_limits<int64_t>::max()), child_query_executor_(new ChildQueryExecutor), session_(move(session)), coord_exec_called_(false), // Profile is assigned name w/ id after planning profile_(RuntimeProfile::Create(&profile_pool_, "Query", false)), frontend_profile_(RuntimeProfile::Create(&profile_pool_, "Frontend", false)), server_profile_(RuntimeProfile::Create(&profile_pool_, "ImpalaServer", false)), summary_profile_(RuntimeProfile::Create(&profile_pool_, "Summary", false)), frontend_(frontend), parent_server_(server), start_time_us_(UnixMicros()), fetch_rows_timeout_us_(MICROS_PER_MILLI * query_options().fetch_rows_timeout_ms), parent_driver_(query_driver) { bool is_external_fe = session_type() == TSessionType::EXTERNAL_FRONTEND; // "Impala Backend Timeline" was specifically chosen to exploit the lexicographical // ordering defined by the underlying std::map holding the timelines displayed in // the web UI. This helps ensure that "Frontend Timeline" is displayed before // "Impala Backend Timeline". query_events_ = summary_profile_->AddEventSequence( is_external_fe ? "Impala Backend Timeline" : "Query Timeline"); query_events_->Start(); profile_->AddChild(summary_profile_); #ifndef NDEBUG profile_->AddInfoString("DEBUG MODE WARNING", "Query profile created while running a " "DEBUG build of Impala. Use RELEASE builds to measure query performance."); #endif row_materialization_timer_ = ADD_TIMER(server_profile_, "RowMaterializationTimer"); num_rows_fetched_counter_ = ADD_COUNTER(server_profile_, "NumRowsFetched", TUnit::UNIT); row_materialization_rate_ = server_profile_->AddDerivedCounter("RowMaterializationRate", TUnit::UNIT_PER_SECOND, bind<int64_t>(&RuntimeProfile::UnitsPerSecond, num_rows_fetched_counter_, row_materialization_timer_)); num_rows_fetched_from_cache_counter_ = ADD_COUNTER(server_profile_, "NumRowsFetchedFromCache", TUnit::UNIT); client_wait_timer_ = ADD_TIMER(server_profile_, "ClientFetchWaitTimer"); client_wait_time_stats_ = ADD_SUMMARY_STATS_TIMER(server_profile_, "ClientFetchWaitTimeStats"); rpc_read_timer_ = ADD_TIMER(server_profile_, "RPCReadTimer"); rpc_write_timer_ = ADD_TIMER(server_profile_, "RPCWriteTimer"); rpc_count_ = ADD_COUNTER(server_profile_, "RPCCount", TUnit::UNIT); get_inflight_profile_time_stats_ = PROFILE_GetInFlightProfileTimeStats.Instantiate(server_profile_); client_fetch_lock_wait_timer_ = PROFILE_ClientFetchLockWaitTimer.Instantiate(server_profile_); profile_->set_name("Query (id=" + PrintId(query_id()) + ")"); summary_profile_->AddInfoString("Session ID", PrintId(session_id())); summary_profile_->AddInfoString("Session Type", PrintValue(session_type())); if (session_type() == TSessionType::HIVESERVER2 || session_type() == TSessionType::EXTERNAL_FRONTEND) { summary_profile_->AddInfoString( "HiveServer2 Protocol Version", Substitute("V$0", 1 + session_->hs2_version)); } // Certain API clients expect Start Time and End Time to be date-time strings // of nanosecond precision, so we explicitly specify the precision here. summary_profile_->AddInfoString("Start Time", ToStringFromUnixMicros(start_time_us(), TimePrecision::Nanosecond)); summary_profile_->AddInfoString("End Time", ""); summary_profile_->AddInfoString("Duration", ""); summary_profile_->AddInfoString("Query Type", "N/A"); summary_profile_->AddInfoString("Query State", PrintValue(BeeswaxQueryState())); summary_profile_->AddInfoString( "Impala Query State", ExecStateToString(exec_state())); summary_profile_->AddInfoString("Query Status", "OK"); summary_profile_->AddInfoString("Impala Version", GetVersionString(/* compact */ true)); summary_profile_->AddInfoString("User", effective_user()); summary_profile_->AddInfoString("Connected User", connected_user()); summary_profile_->AddInfoString("Delegated User", do_as_user()); summary_profile_->AddInfoString("Network Address", TNetworkAddressToString(session_->network_address)); if (!session_->http_origin.empty()) { /// If using hs2-http protocol, this is the origin of the session /// as recorded in the X-Forwarded-For http message header. summary_profile_->AddInfoString("Http Origin", session_->http_origin); } summary_profile_->AddInfoString("Default Db", default_db()); summary_profile_->AddInfoStringRedacted( "Sql Statement", query_ctx_.client_request.stmt); summary_profile_->AddInfoString("Coordinator", TNetworkAddressToString(ExecEnv::GetInstance()->configured_backend_address())); summary_profile_->AddChild(frontend_profile_); AdmissionControlClient::Create(query_ctx_, &admission_control_client_); } ClientRequestState::~ClientRequestState() { DCHECK(wait_thread_.get() == NULL) << "Finalize() needs to be called!"; DCHECK(!track_rpcs_); // Should get set to false in Finalize() DCHECK(pending_rpcs_.empty()); // Should get cleared in Finalize() UnRegisterRemainingRPCs(); // Avoid memory leaks if Finalize() didn't get called } Status ClientRequestState::SetResultCache(QueryResultSet* cache, int64_t max_size) { lock_guard<mutex> l(lock_); DCHECK(result_cache_ == NULL); result_cache_.reset(cache); if (max_size > FLAGS_max_result_cache_size) { return Status( Substitute("Requested result-cache size of $0 exceeds Impala's maximum of $1.", max_size, FLAGS_max_result_cache_size)); } result_cache_max_size_ = max_size; return Status::OK(); } void ClientRequestState::SetRemoteSubmitTime(int64_t remote_submit_time) { int64_t ack_submit_time = min(MonotonicStopWatch::Now(), remote_submit_time); if (ack_submit_time < remote_submit_time) { VLOG_QUERY << "Ignoring remote_submit_time (" << remote_submit_time << " ns) that is more than coordinator time (" << ack_submit_time << " ns) for query id=" << PrintId(query_id()); } query_events_->Start(ack_submit_time); } void ClientRequestState::SetFrontendProfile(const TExecRequest& exec_request) { // Should we defer creating and adding the child until here? probably. TRuntimeProfileTree prof_tree; prof_tree.nodes.emplace_back(std::move(exec_request.profile)); for (auto& child : exec_request.profile_children) { prof_tree.nodes.emplace_back(std::move(child)); } prof_tree.nodes.at(0).num_children = prof_tree.nodes.size() - 1; frontend_profile_->Update(prof_tree, false); } void ClientRequestState::AddBlacklistedExecutorAddress(const NetworkAddressPB& addr) { lock_guard<mutex> l(lock_); if (!WasRetried()) blacklisted_executor_addresses_.emplace(addr); } void ClientRequestState::SetBlacklistedExecutorAddresses( std::unordered_set<NetworkAddressPB>& executor_addresses) { DCHECK(blacklisted_executor_addresses_.empty()); if (!executor_addresses.empty()) { blacklisted_executor_addresses_.insert( executor_addresses.begin(), executor_addresses.end()); } } Status ClientRequestState::Exec() { MarkActive(); const TExecRequest& exec_req = exec_request(); profile_->AddChild(server_profile_); summary_profile_->AddInfoString("Query Type", PrintValue(stmt_type())); summary_profile_->AddInfoString("Query Options (set by configuration)", DebugQueryOptions(query_ctx_.client_request.query_options)); summary_profile_->AddInfoString("Query Options (set by configuration and planner)", DebugQueryOptions(exec_req.query_options)); if (!exec_req.tables.empty()) { summary_profile_->AddInfoString("Tables Queried", PrintTableList(exec_req.tables)); } if (!exec_req.select_columns.empty()) { summary_profile_->AddInfoString("Select Columns", join(exec_req.select_columns, ",")); } if (!exec_req.where_columns.empty()) { summary_profile_->AddInfoString("Where Columns", join(exec_req.where_columns, ",")); } if (!exec_req.join_columns.empty()) { summary_profile_->AddInfoString("Join Columns", join(exec_req.join_columns, ",")); } if (!exec_req.aggregate_columns.empty()) { summary_profile_->AddInfoString( "Aggregate Columns", join(exec_req.aggregate_columns, ",")); } if (!exec_req.orderby_columns.empty()) { summary_profile_->AddInfoString( "OrderBy Columns", join(exec_req.orderby_columns, ",")); } if (query_ctx_.__isset.overridden_mt_dop_value) { DCHECK(query_ctx_.client_request.query_options.__isset.mt_dop); summary_profile_->AddInfoString("MT_DOP limited by admission control", Substitute("Requested MT_DOP=$0 reduced to MT_DOP=$1", query_ctx_.overridden_mt_dop_value, query_ctx_.client_request.query_options.mt_dop)); } switch (exec_req.stmt_type) { case TStmtType::QUERY: case TStmtType::DML: DCHECK(exec_req.__isset.query_exec_request); RETURN_IF_ERROR( ExecQueryOrDmlRequest(exec_req.query_exec_request, true /*async*/)); break; case TStmtType::EXPLAIN: { request_result_set_.reset(new vector<TResultRow>( exec_req.explain_result.results)); break; } case TStmtType::TESTCASE: { DCHECK(exec_req.__isset.testcase_data_path); SetResultSet(vector<string>(1, exec_req.testcase_data_path)); break; } case TStmtType::DDL: { DCHECK(exec_req.__isset.catalog_op_request); LOG_AND_RETURN_IF_ERROR(ExecDdlRequest()); break; } case TStmtType::LOAD: { DCHECK(exec_req.__isset.load_data_request); LOG_AND_RETURN_IF_ERROR(ExecLoadDataRequest()); break; } case TStmtType::SET: { DCHECK(exec_req.__isset.set_query_option_request); lock_guard<mutex> l(session_->lock); if (exec_req.set_query_option_request.__isset.key) { // "SET key=value" updates the session query options. DCHECK(exec_req.set_query_option_request.__isset.value); const auto& key = exec_req.set_query_option_request.key; const auto& value = exec_req.set_query_option_request.value; RETURN_IF_ERROR(SetQueryOption(key, value, &session_->set_query_options, &session_->set_query_options_mask)); SetResultSet({}, {}, {}); if (iequals(key, "idle_session_timeout")) { // IMPALA-2248: Session timeout is set as a query option session_->last_accessed_ms = UnixMillis(); // do not expire session immediately session_->UpdateTimeout(); VLOG_QUERY << "ClientRequestState::Exec() SET: idle_session_timeout=" << PrettyPrinter::Print(session_->session_timeout, TUnit::TIME_S); } } else if (exec_req.set_query_option_request.__isset.query_option_type && exec_req.set_query_option_request.query_option_type == TQueryOptionType::UNSET_ALL) { // "UNSET ALL" RETURN_IF_ERROR(ResetAllQueryOptions( &session_->set_query_options, &session_->set_query_options_mask)); SetResultSet({}, {}, {}); } else { // "SET" or "SET ALL" bool is_set_all = exec_req.set_query_option_request.__isset.query_option_type && exec_req.set_query_option_request.query_option_type == TQueryOptionType::SET_ALL; PopulateResultForSet(is_set_all); } break; } case TStmtType::ADMIN_FN: if (exec_req.admin_request.type == TAdminRequestType::SHUTDOWN) { RETURN_IF_ERROR(ExecShutdownRequest()); } else if (exec_req.admin_request.type == TAdminRequestType::EVENT_PROCESSOR) { RETURN_IF_ERROR(ExecEventProcessorCmd()); } else { DCHECK(false); } break; case TStmtType::CONVERT: DCHECK(exec_req.__isset.convert_table_request); LOG_AND_RETURN_IF_ERROR(ExecMigrateRequest()); break; case TStmtType::UNKNOWN: DCHECK(false); return Status("Exec request uninitialized during execution"); case TStmtType::KILL: DCHECK(exec_req.__isset.kill_query_request); LOG_AND_RETURN_IF_ERROR(ExecKillQueryRequest()); break; default: return Status(Substitute("Unknown exec request stmt type: $0", exec_req.stmt_type)); } if (async_exec_thread_.get() == nullptr) { UpdateNonErrorExecState(ExecState::RUNNING); } return Status::OK(); } void ClientRequestState::PopulateResultForSet(bool is_set_all) { map<string, string> config; TQueryOptionsToMap(query_options(), &config); vector<string> keys, values, levels; map<string, string>::const_iterator itr = config.begin(); for (; itr != config.end(); ++itr) { const auto opt_level_id = parent_server_->query_option_levels_[itr->first]; if (!is_set_all && (opt_level_id == TQueryOptionLevel::DEVELOPMENT || opt_level_id == TQueryOptionLevel::DEPRECATED || opt_level_id == TQueryOptionLevel::REMOVED)) { continue; } keys.push_back(itr->first); values.push_back(itr->second); const auto opt_level = _TQueryOptionLevel_VALUES_TO_NAMES.find(opt_level_id); DCHECK(opt_level !=_TQueryOptionLevel_VALUES_TO_NAMES.end()); levels.push_back(opt_level->second); } SetResultSet(keys, values, levels); } Status ClientRequestState::ExecLocalCatalogOp( const TCatalogOpRequest& catalog_op) { switch (catalog_op.op_type) { case TCatalogOpType::USE: { lock_guard<mutex> l(session_->lock); session_->database = exec_request().catalog_op_request.use_db_params.db; return Status::OK(); } case TCatalogOpType::SHOW_TABLES: case TCatalogOpType::SHOW_VIEWS: { const TShowTablesParams* params = &catalog_op.show_tables_params; // A NULL pattern means match all tables of the specified table types. However, // Thrift string types can't be NULL in C++, so we have to test if it's set rather // than just blindly using the value. const string* table_name_pattern = params->__isset.show_pattern ? &(params->show_pattern) : nullptr; TGetTablesResult table_names; const set<TImpalaTableType::type>& table_types = params->table_types; RETURN_IF_ERROR(frontend_->GetTableNames(params->db, table_name_pattern, &query_ctx_.session, table_types, &table_names)); SetResultSet(table_names.tables); return Status::OK(); } case TCatalogOpType::SHOW_METADATA_TABLES: { const TShowTablesParams* params = &catalog_op.show_tables_params; // A NULL pattern means match all tables of the specified table types. However, // Thrift string types can't be NULL in C++, so we have to test if it's set rather // than just blindly using the value. const string* metadata_table_name_pattern = params->__isset.show_pattern ? &(params->show_pattern) : nullptr; DCHECK(params->__isset.tbl); const string& table_name = params->tbl; TGetTablesResult table_names; RETURN_IF_ERROR(frontend_->GetMetadataTableNames(params->db, table_name, metadata_table_name_pattern, &query_ctx_.session, &table_names)); SetResultSet(table_names.tables); return Status::OK(); } case TCatalogOpType::SHOW_DBS: { const TShowDbsParams* params = &catalog_op.show_dbs_params; TGetDbsResult dbs; const string* db_pattern = params->__isset.show_pattern ? (&params->show_pattern) : NULL; RETURN_IF_ERROR( frontend_->GetDbs(db_pattern, &query_ctx_.session, &dbs)); vector<string> names, comments; names.reserve(dbs.dbs.size()); comments.reserve(dbs.dbs.size()); for (const TDatabase& db: dbs.dbs) { names.push_back(db.db_name); comments.push_back(db.metastore_db.description); } SetResultSet(names, comments); return Status::OK(); } case TCatalogOpType::SHOW_DATA_SRCS: { const TShowDataSrcsParams* params = &catalog_op.show_data_srcs_params; TGetDataSrcsResult result; const string* pattern = params->__isset.show_pattern ? (&params->show_pattern) : NULL; RETURN_IF_ERROR( frontend_->GetDataSrcMetadata(pattern, &result)); SetResultSet(result.data_src_names, result.locations, result.class_names, result.api_versions); return Status::OK(); } case TCatalogOpType::SHOW_STATS: { const TShowStatsParams& params = catalog_op.show_stats_params; TResultSet response; RETURN_IF_ERROR(frontend_->GetStats(params, &response)); // Set the result set and its schema from the response. request_result_set_.reset(new vector<TResultRow>(response.rows)); result_metadata_ = response.schema; return Status::OK(); } case TCatalogOpType::SHOW_FUNCTIONS: { const TShowFunctionsParams* params = &catalog_op.show_fns_params; TGetFunctionsResult functions; const string* fn_pattern = params->__isset.show_pattern ? (&params->show_pattern) : NULL; RETURN_IF_ERROR(frontend_->GetFunctions( params->category, params->db, fn_pattern, &query_ctx_.session, &functions)); SetResultSet(functions.fn_ret_types, functions.fn_signatures, functions.fn_binary_types, functions.fn_persistence); return Status::OK(); } case TCatalogOpType::SHOW_ROLES: { const TShowRolesParams& params = catalog_op.show_roles_params; // If we have made it here, the user has privileges to execute this operation. // Return the results. TShowRolesResult result; RETURN_IF_ERROR(frontend_->ShowRoles(params, &result)); SetResultSet(result.role_names); return Status::OK(); } case TCatalogOpType::SHOW_GRANT_PRINCIPAL: { const TShowGrantPrincipalParams& params = catalog_op.show_grant_principal_params; TResultSet response; RETURN_IF_ERROR(frontend_->GetPrincipalPrivileges(params, &response)); // Set the result set and its schema from the response. request_result_set_.reset(new vector<TResultRow>(response.rows)); result_metadata_ = response.schema; return Status::OK(); } case TCatalogOpType::DESCRIBE_HISTORY: { // This operation is supported for Iceberg tables only. const TDescribeHistoryParams& params = catalog_op.describe_history_params; TGetTableHistoryResult result; RETURN_IF_ERROR(frontend_->GetTableHistory(params, &result)); request_result_set_.reset(new vector<TResultRow>); request_result_set_->resize(result.result.size()); for (int i = 0; i < result.result.size(); ++i) { const TGetTableHistoryResultItem item = result.result[i]; TResultRow &result_row = (*request_result_set_.get())[i]; result_row.__isset.colVals = true; result_row.colVals.resize(4); const Timezone* local_tz = TimezoneDatabase::FindTimezone( query_options().timezone); TimestampValue tv = TimestampValue::FromUnixTimeMicros( item.creation_time * 1000, local_tz); result_row.colVals[0].__set_string_val(tv.ToString()); result_row.colVals[1].__set_string_val(std::to_string(item.snapshot_id)); result_row.colVals[2].__set_string_val( (item.__isset.parent_id) ? std::to_string(item.parent_id) : "NULL"); result_row.colVals[3].__set_string_val( (item.is_current_ancestor) ? "TRUE" : "FALSE"); } return Status::OK(); } case TCatalogOpType::DESCRIBE_DB: { TDescribeResult response; RETURN_IF_ERROR(frontend_->DescribeDb(catalog_op.describe_db_params, &response)); // Set the result set request_result_set_.reset(new vector<TResultRow>(response.results)); return Status::OK(); } case TCatalogOpType::DESCRIBE_TABLE: { TDescribeResult response; const TDescribeTableParams& params = catalog_op.describe_table_params; RETURN_IF_ERROR(frontend_->DescribeTable(params, query_ctx_.session, &response)); // Set the result set request_result_set_.reset(new vector<TResultRow>(response.results)); return Status::OK(); } case TCatalogOpType::SHOW_CREATE_TABLE: { string response; RETURN_IF_ERROR(frontend_->ShowCreateTable(catalog_op.show_create_table_params, &response)); SetResultSet(vector<string>(1, response)); return Status::OK(); } case TCatalogOpType::SHOW_CREATE_FUNCTION: { string response; RETURN_IF_ERROR(frontend_->ShowCreateFunction(catalog_op.show_create_function_params, &response)); SetResultSet(vector<string>(1, response)); return Status::OK(); } case TCatalogOpType::SHOW_FILES: { TResultSet response; RETURN_IF_ERROR(frontend_->GetTableFiles(catalog_op.show_files_params, &response)); // Set the result set and its schema from the response. request_result_set_.reset(new vector<TResultRow>(response.rows)); result_metadata_ = response.schema; return Status::OK(); } default: { stringstream ss; ss << "Unexpected TCatalogOpType: " << catalog_op.op_type; return Status(ss.str()); } } } Status ClientRequestState::ExecQueryOrDmlRequest( const TQueryExecRequest& query_exec_request, bool isAsync) { // we always need at least one plan fragment DCHECK(query_exec_request.plan_exec_info.size() > 0); if (query_exec_request.__isset.query_plan) { stringstream plan_ss; // Add some delimiters to make it clearer where the plan // begins and the profile ends plan_ss << "\n----------------\n" << query_exec_request.query_plan << "----------------"; summary_profile_->AddInfoStringRedacted("Plan", plan_ss.str()); } // Add info strings consumed by CM: Estimated mem and tables missing stats. if (query_exec_request.__isset.per_host_mem_estimate) { stringstream ss; ss << query_exec_request.per_host_mem_estimate; summary_profile_->AddInfoString(PER_HOST_MEM_KEY, ss.str()); } if (!query_exec_request.query_ctx.__isset.parent_query_id && query_exec_request.query_ctx.__isset.tables_missing_stats && !query_exec_request.query_ctx.tables_missing_stats.empty()) { summary_profile_->AddInfoString(TABLES_MISSING_STATS_KEY, PrintTableList(query_exec_request.query_ctx.tables_missing_stats)); } if (!query_exec_request.query_ctx.__isset.parent_query_id && query_exec_request.query_ctx.__isset.tables_with_corrupt_stats && !query_exec_request.query_ctx.tables_with_corrupt_stats.empty()) { summary_profile_->AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, PrintTableList(query_exec_request.query_ctx.tables_with_corrupt_stats)); } if (query_exec_request.query_ctx.__isset.tables_missing_diskids && !query_exec_request.query_ctx.tables_missing_diskids.empty()) { summary_profile_->AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, PrintTableList(query_exec_request.query_ctx.tables_missing_diskids)); } { lock_guard<mutex> l(lock_); // Don't start executing the query if Cancel() was called concurrently with Exec(). if (is_cancelled_) return Status::CANCELLED; } if (isAsync) { // Don't transition to PENDING inside the FinishExecQueryOrDmlRequest thread because // the query should be in the PENDING state before the Exec RPC returns. UpdateNonErrorExecState(ExecState::PENDING); RETURN_IF_ERROR(Thread::Create("query-exec-state", "async-exec-thread", &ClientRequestState::FinishExecQueryOrDmlRequest, this, &async_exec_thread_, true)); } else { // Update query_status_ as necessary. FinishExecQueryOrDmlRequest(); return query_status_; } return Status::OK(); } void ClientRequestState::FinishExecQueryOrDmlRequest() { const TExecRequest& exec_req = exec_request(); DCHECK(exec_req.__isset.query_exec_request); UniqueIdPB query_id_pb; TUniqueIdToUniqueIdPB(query_id(), &query_id_pb); Status admit_status = admission_control_client_->SubmitForAdmission( {query_id_pb, ExecEnv::GetInstance()->backend_id(), exec_req.query_exec_request, exec_req.query_options, summary_profile_, blacklisted_executor_addresses_}, query_events_, &schedule_, &wait_start_time_ms_, &wait_end_time_ms_); { lock_guard<mutex> l(lock_); if (!UpdateQueryStatus(admit_status).ok()) return; } DCHECK(schedule_.get() != nullptr); // Note that we don't need to check for cancellation between admission and query // startup. The query was not cancelled right before being admitted and the window here // is small enough to not require special handling. Instead we start the query and then // cancel it through the check below if necessary. DebugActionNoFail(exec_req.query_options, "CRS_BEFORE_COORD_STARTS"); // Register the query with the server to support cancellation. This happens after // admission because now the set of executors is fixed and an executor failure will // cause a query failure. parent_server_->RegisterQueryLocations(schedule_->backend_exec_params(), query_id()); coord_.reset(new Coordinator(this, exec_req, *schedule_.get(), query_events_)); Status exec_status = coord_->Exec(); DebugActionNoFail(exec_req.query_options, "CRS_AFTER_COORD_STARTS"); // Make coordinator profile visible, even upon failure. if (coord_->query_profile() != nullptr) profile_->AddChild(coord_->query_profile()); bool cancelled = false; Status cancellation_status; { lock_guard<mutex> l(lock_); if (!UpdateQueryStatus(exec_status).ok()) return; // Coordinator::Exec() finished successfully - it is safe to concurrently access // 'coord_'. This thread needs to cancel the coordinator if cancellation occurred // *before* 'coord_' was accessible to other threads. Once the lock is dropped, any // future calls to Cancel() are responsible for calling Coordinator::Cancel(), so // while holding the lock we need to both perform a check for cancellation and make // the coord_ visible. coord_exec_called_.Store(true); cancelled = is_cancelled_; if (cancelled) { VLOG_QUERY << "Cancelled right after starting the coordinator query id=" << PrintId(query_id()); discard_result(UpdateQueryStatus(Status::CANCELLED)); } } if (cancelled) { coord_->Cancel(); return; } UpdateNonErrorExecState(ExecState::RUNNING); } Status ClientRequestState::ExecDdlRequestImplSync() { if (catalog_op_type() != TCatalogOpType::DDL && catalog_op_type() != TCatalogOpType::RESET_METADATA) { Status status = ExecLocalCatalogOp(exec_request().catalog_op_request); lock_guard<mutex> l(lock_); return UpdateQueryStatus(status); } if (ddl_type() == TDdlType::COMPUTE_STATS) { const TComputeStatsParams& compute_stats_params = exec_request().catalog_op_request.ddl_params.compute_stats_params; RuntimeProfile* child_profile = RuntimeProfile::Create(&profile_pool_, "Child Queries"); profile_->AddChild(child_profile); // Add child queries for computing table and column stats. vector<ChildQuery> child_queries; if (compute_stats_params.__isset.tbl_stats_query) { RuntimeProfile* profile = RuntimeProfile::Create(&profile_pool_, "Table Stats Query"); child_profile->AddChild(profile); child_queries.emplace_back(compute_stats_params.tbl_stats_query, this, parent_server_, profile, &profile_pool_); } if (compute_stats_params.__isset.col_stats_query) { RuntimeProfile* profile = RuntimeProfile::Create(&profile_pool_, "Column Stats Query"); child_profile->AddChild(profile); child_queries.emplace_back(compute_stats_params.col_stats_query, this, parent_server_, profile, &profile_pool_); } if (child_queries.size() > 0) { RETURN_IF_ERROR(child_query_executor_->ExecAsync(move(child_queries))); } else { SetResultSet({"No partitions selected for incremental stats update."}); } return Status::OK(); } DCHECK(false) << "Not handled sync exec ddl request."; return Status::OK(); } void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) { bool is_CTAS = (catalog_op_type() == TCatalogOpType::DDL && ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT); const TExecRequest& exec_req = exec_request(); catalog_op_executor_.reset( new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_)); // Indirectly check if running in thread async_exec_thread_. if (exec_in_worker_thread) { VLOG_QUERY << "Running in worker thread"; DCHECK(exec_state() == ExecState::PENDING); // 1. For any non-CTAS DDLs, transition to RUNNING // 2. For CTAS DDLs, transition to RUNNING during FinishExecQueryOrDmlRequest() // called by ExecQueryOrDmlRequest(). if (!is_CTAS) UpdateNonErrorExecState(ExecState::RUNNING); } // Optionally wait with a debug action before Exec() below. DebugActionNoFail(exec_req.query_options, "CRS_DELAY_BEFORE_CATALOG_OP_EXEC"); Status status = catalog_op_executor_->Exec(exec_req.catalog_op_request); query_events_->MarkEvent("CatalogDdlRequest finished"); AddCatalogTimeline(); { lock_guard<mutex> l(lock_); RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread)); } // If this is a CTAS request, there will usually be more work to do // after executing the CREATE TABLE statement (the INSERT portion of the operation). // The exception is if the user specified IF NOT EXISTS and the table already // existed, in which case we do not execute the INSERT. if (catalog_op_type() == TCatalogOpType::DDL && ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT && !catalog_op_executor_->ddl_exec_response()->new_table_created) { DCHECK(exec_req.catalog_op_request. ddl_params.create_table_params.if_not_exists); return; } // Add newly created table to catalog cache. status = parent_server_->ProcessCatalogUpdateResult( *catalog_op_executor_->update_catalog_result(), exec_req.query_options.sync_ddl, query_options(), query_events_); { lock_guard<mutex> l(lock_); RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread)); } if (is_CTAS) { // At this point, the remainder of the CTAS request executes // like a normal DML request. As with other DML requests, it will // wait for another catalog update if any partitions were altered as a result // of the operation. DCHECK(exec_req.__isset.query_exec_request); RETURN_VOID_IF_ERROR( ExecQueryOrDmlRequest(exec_req.query_exec_request, !exec_in_worker_thread)); } // Set the results to be reported to the client. Do this under lock to avoid races // with ImpalaServer::GetResultSetMetadata(). { lock_guard<mutex> l(lock_); SetResultSet(catalog_op_executor_->ddl_exec_response()); } } bool ClientRequestState::ShouldRunExecDdlAsync() { // Local catalog op DDL will run synchronously. if (catalog_op_type() != TCatalogOpType::DDL && catalog_op_type() != TCatalogOpType::RESET_METADATA) { return false; } // The exec DDL part of compute stats will run synchronously. if (ddl_type() == TDdlType::COMPUTE_STATS) return false; return true; } Status ClientRequestState::ExecDdlRequest() { string op_type = catalog_op_type() == TCatalogOpType::DDL ? PrintValue(ddl_type()) : PrintValue(catalog_op_type()); bool async_ddl = ShouldRunExecDdlAsync(); bool async_ddl_enabled = exec_request().query_options.enable_async_ddl_execution; string exec_mode = (async_ddl && async_ddl_enabled) ? "asynchronous" : "synchronous"; summary_profile_->AddInfoString("DDL Type", op_type); summary_profile_->AddInfoString("DDL execution mode", exec_mode); VLOG_QUERY << "DDL exec mode=" << exec_mode; if (!async_ddl) return ExecDdlRequestImplSync(); if (async_ddl_enabled) { // Transition the exec state out of INITIALIZED to PENDING to make available the // runtime profile for the DDL. Later on in ExecDdlRequestImpl(), the state // further transitions to RUNNING. UpdateNonErrorExecState(ExecState::PENDING); return Thread::Create("impala-server", "async_exec_thread_", &ClientRequestState::ExecDdlRequestImpl, this, true /*exec in a worker thread*/, &async_exec_thread_); } else { ExecDdlRequestImpl(false /*exec in the same thread as the caller*/); return query_status_; } } void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) { const TExecRequest& exec_req = exec_request(); if (exec_in_worker_thread) { VLOG_QUERY << "Running in worker thread"; DCHECK(exec_state() == ExecState::PENDING); UpdateNonErrorExecState(ExecState::RUNNING); } DebugActionNoFail( exec_req.query_options, "CRS_DELAY_BEFORE_LOAD_DATA"); TLoadDataResp response; Status status = frontend_->LoadData(exec_req.load_data_request, &response); if (exec_req.load_data_request.iceberg_tbl) { ExecLoadIcebergDataRequestImpl(response); } { lock_guard<mutex> l(lock_); RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread)); } request_result_set_.reset(new vector<TResultRow>); request_result_set_->push_back(response.load_summary); // We use TUpdateCatalogRequest to refresh the table metadata so that it will // fire an insert event just like an insert statement. TUpdatedPartition updatedPartition; updatedPartition.files.insert(updatedPartition.files.end(), response.loaded_files.begin(), response.loaded_files.end()); TUpdateCatalogRequest catalog_update; // The partition_name is an empty string for unpartitioned tables. catalog_update.updated_partitions[response.partition_name] = updatedPartition; catalog_update.__set_sync_ddl(exec_req.query_options.sync_ddl); catalog_update.__set_header(GetCatalogServiceRequestHeader()); catalog_update.target_table = exec_req.load_data_request.table_name.table_name; catalog_update.db_name = exec_req.load_data_request.table_name.db_name; catalog_update.is_overwrite = exec_req.load_data_request.overwrite; CatalogServiceConnection client(ExecEnv::GetInstance()->catalogd_client_cache(), *ExecEnv::GetInstance()->GetCatalogdAddress().get(), &status); { lock_guard<mutex> l(lock_); RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread)); } TUpdateCatalogResponse resp; status = client.DoRpc( &CatalogServiceClientWrapper::UpdateCatalog, catalog_update, &resp); query_events_->MarkEvent("UpdateCatalog finished"); if (resp.__isset.profile) { for (const TEventSequence& catalog_timeline : resp.profile.event_sequences) { summary_profile_->AddEventSequence(catalog_timeline.name, catalog_timeline); } } { lock_guard<mutex> l(lock_); RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread)); } status = parent_server_->ProcessCatalogUpdateResult( resp.result, exec_req.query_options.sync_ddl, query_options(), query_events_); { lock_guard<mutex> l(lock_); RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread)); } } void ClientRequestState::ExecLoadIcebergDataRequestImpl(TLoadDataResp response) { TLoadDataReq load_data_req = exec_request().load_data_request; RuntimeProfile* child_profile = RuntimeProfile::Create(&profile_pool_, "Child Queries"); profile_->AddChild(child_profile); // Add child queries for computing table and column stats. vector<ChildQuery> child_queries; // Prepare CREATE RuntimeProfile* create_profile = RuntimeProfile::Create(&profile_pool_, "Create table query"); child_profile->AddChild(create_profile); child_queries.emplace_back(response.create_tmp_tbl_query, this, parent_server_, create_profile, &profile_pool_); // Prepare INSERT RuntimeProfile* insert_profile = RuntimeProfile::Create(&profile_pool_, "Insert query"); child_profile->AddChild(insert_profile); child_queries.emplace_back(load_data_req.insert_into_dst_tbl_query, this, parent_server_, insert_profile, &profile_pool_); // Prepare DROP RuntimeProfile* drop_profile = RuntimeProfile::Create(&profile_pool_, "Drop table query"); child_profile->AddChild(drop_profile); child_queries.emplace_back(load_data_req.drop_tmp_tbl_query, this, parent_server_, drop_profile, &profile_pool_); // Execute queries RETURN_VOID_IF_ERROR(child_query_executor_->ExecAsync(move(child_queries))); vector<ChildQuery*>* completed_queries = new vector<ChildQuery*>(); Status query_status = child_query_executor_->WaitForAll(completed_queries); if (query_status.ok()) { const char* path = response.create_location.c_str(); string delete_err = "Load was succesful, but failed to remove staging data under '" + response.create_location + "', HDFS error: "; hdfsFS hdfs_conn; Status hdfs_ret = HdfsFsCache::instance()->GetConnection(path, &hdfs_conn); if (!hdfs_ret.ok()) { lock_guard<mutex> l(lock_); RETURN_VOID_IF_ERROR(UpdateQueryStatus(Status(delete_err + hdfs_ret.GetDetail()))); } if (hdfsDelete(hdfs_conn, path, 1)) { lock_guard<mutex> l(lock_); RETURN_VOID_IF_ERROR(UpdateQueryStatus(Status(delete_err + strerror(errno)))); } } else { const char* dst_path = load_data_req.source_path.c_str(); hdfsFS hdfs_dst_conn; string revert_err = "Failed to load data and failed to revert data movement, " "please check source and staging directory under '" + response.create_location + "', Query error: " + query_status.GetDetail() + " HDFS error: "; Status hdfs_ret = HdfsFsCache::instance()->GetConnection(dst_path, &hdfs_dst_conn); if (!hdfs_ret.ok()) { lock_guard<mutex> l(lock_); RETURN_VOID_IF_ERROR(UpdateQueryStatus(Status(revert_err + hdfs_ret.GetDetail()))); } for (const string& src_path : response.loaded_files) { hdfsFS hdfs_src_conn; hdfs_ret = HdfsFsCache::instance()->GetConnection(dst_path, &hdfs_src_conn); if (!hdfs_ret.ok()) { lock_guard<mutex> l(lock_); RETURN_VOID_IF_ERROR(UpdateQueryStatus(Status(revert_err + hdfs_ret.GetDetail()))); } if (hdfsMove(hdfs_src_conn, src_path.c_str(), hdfs_dst_conn, dst_path)) { lock_guard<mutex> l(lock_); RETURN_VOID_IF_ERROR(UpdateQueryStatus(Status(revert_err + strerror(errno)))); } } } } Status ClientRequestState::ExecLoadDataRequest() { if (exec_request().query_options.enable_async_load_data_execution) { // Transition the exec state out of INITIALIZED to PENDING to make available the // runtime profile for the DDL. UpdateNonErrorExecState(ExecState::PENDING); return Thread::Create("impala-server", "async_exec_thread_", &ClientRequestState::ExecLoadDataRequestImpl, this, true, &async_exec_thread_); } // sync exection ExecLoadDataRequestImpl(false /* not use a worker thread */); return query_status_; } Status ClientRequestState::ExecShutdownRequest() { const TShutdownParams& request = exec_request().admin_request.shutdown_params; bool backend_port_specified = request.__isset.backend && request.backend.port != 0; int port = backend_port_specified ? request.backend.port : FLAGS_krpc_port; // Use the local shutdown code path if the host is unspecified or if it exactly matches // the configured host/port. This avoids the possibility of RPC errors preventing // shutdown. if (!request.__isset.backend || (request.backend.hostname == FLAGS_hostname && port == FLAGS_krpc_port)) { ShutdownStatusPB shutdown_status; int64_t deadline_s = request.__isset.deadline_s ? request.deadline_s : -1; RETURN_IF_ERROR(parent_server_->StartShutdown(deadline_s, &shutdown_status)); SetResultSet({ImpalaServer::ShutdownStatusToString(shutdown_status)}); return Status::OK(); } // KRPC relies on resolved IP address, so convert hostname. IpAddr ip_address; Status ip_status = HostnameToIpAddr(request.backend.hostname, &ip_address); if (!ip_status.ok()) { VLOG(1) << "Could not convert hostname " << request.backend.hostname << " to ip address, error: " << ip_status.GetDetail(); return ip_status; } // Find BackendId for the given remote ip address and port from cluster membership. // The searching is not efficient, but Shutdown Requests are not called frequently. // The BackendId is used to generate UDS address for Unix domain socket. Leave the // Id value as 0 if it's not found in cluster membership. // Note that UDS is only used when FLAGS_rpc_use_unix_domain_socket is set as true. UniqueIdPB backend_id; backend_id.set_hi(0); backend_id.set_lo(0); if (ExecEnv::GetInstance()->rpc_mgr()->IsKrpcUsingUDS()) { if (ExecEnv::GetInstance()->rpc_mgr()->GetUdsAddressUniqueId() == UdsAddressUniqueIdPB::BACKEND_ID) { ClusterMembershipMgr::SnapshotPtr membership_snapshot = ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot(); DCHECK(membership_snapshot.get() != nullptr); for (const auto& it : membership_snapshot->current_backends) { // Compare resolved IP addresses and ports. if (it.second.ip_address() == ip_address && it.second.address().port() == port) { DCHECK(it.second.has_backend_id()); backend_id = it.second.backend_id(); break; } } } } string krpc_error = "RemoteShutdown() RPC failed: Network error"; string krpc_error2 = "RemoteShutdown() RPC failed: Timed out"; NetworkAddressPB krpc_addr = MakeNetworkAddressPB(ip_address, port, backend_id, ExecEnv::GetInstance()->rpc_mgr()->GetUdsAddressUniqueId()); std::unique_ptr<ControlServiceProxy> proxy; Status get_proxy_status = ControlService::GetProxy(krpc_addr, request.backend.hostname, &proxy); if (!get_proxy_status.ok()) { return Status( Substitute("Could not get Proxy to ControlService at $0 with error: $1.", NetworkAddressPBToString(krpc_addr), get_proxy_status.msg().msg())); } RemoteShutdownParamsPB params; if (request.__isset.deadline_s) params.set_deadline_s(request.deadline_s); RemoteShutdownResultPB resp; VLOG_QUERY << "Sending Shutdown RPC to " << NetworkAddressPBToString(krpc_addr); const int num_retries = 3; const int64_t timeout_ms = 10 * MILLIS_PER_SEC; const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC; Status rpc_status = RpcMgr::DoRpcWithRetry(proxy, &ControlServiceProxy::RemoteShutdown, params, &resp, query_ctx_, "RemoteShutdown() RPC failed", num_retries, timeout_ms, backoff_time_ms, "CRS_SHUTDOWN_RPC"); if (!rpc_status.ok()) { const string& msg = rpc_status.msg().msg(); VLOG_QUERY << "RemoteShutdown query_id= " << PrintId(query_id()) << " failed to send RPC to " << NetworkAddressPBToString(krpc_addr) << " :" << msg; string err_string = Substitute( "Rpc to $0 failed with error '$1'", NetworkAddressPBToString(krpc_addr), msg); // Attempt to detect if the the failure is because of not using a KRPC port. if (backend_port_specified && (msg.find(krpc_error) != string::npos || msg.find(krpc_error2) != string::npos)) { // Prior to IMPALA-7985 :shutdown() used the backend port. err_string.append(" This may be because the port specified is wrong. You may have" " specified the backend (thrift) port which :shutdown() can no" " longer use. Please make sure the correct KRPC port is being" " used, or don't specify any port in the :shutdown() command."); } return Status(err_string); } Status shutdown_status(resp.status()); RETURN_IF_ERROR(shutdown_status); SetResultSet({ImpalaServer::ShutdownStatusToString(resp.shutdown_status())}); return Status::OK(); } Status ClientRequestState::ExecEventProcessorCmd() { catalog_op_executor_.reset( new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_)); const TEventProcessorCmdParams& params = exec_request().admin_request.event_processor_cmd_params; TSetEventProcessorStatusRequest request; TSetEventProcessorStatusResponse response; request.__set_params(params); request.__set_header(GetCatalogServiceRequestHeader()); Status rpc_status = catalog_op_executor_->SetEventProcessorStatus(request, &response); if (!rpc_status.ok()) { VLOG_QUERY << "SetEventProcessorStatus failed: " << rpc_status.msg().msg(); return rpc_status; } SetResultSet({response.info}); return Status::OK(); } void ClientRequestState::Finalize(const Status* cause) { UnRegisterCompletedRPCs(); Cancel(cause, /*wait_until_finalized=*/true); MarkActive(); // Make sure we join on wait_thread_ before we finish (and especially before this object // is destroyed). int64_t block_on_wait_time_us = 0; BlockOnWait(0, &block_on_wait_time_us); DCHECK_EQ(block_on_wait_time_us, 0); // Update latest observed Kudu timestamp stored in the session from the coordinator. // Needs to take the session_ lock which must not be taken while holding lock_, so this // must happen before taking lock_ below. Coordinator* coordinator = GetCoordinator(); if (coordinator != nullptr) { // This is safe to access on coord_ after Wait() has been called. uint64_t latest_kudu_ts = coordinator->dml_exec_state()->GetKuduLatestObservedTimestamp(); if (latest_kudu_ts > 0) { VLOG_RPC << "Updating session (id=" << PrintId(session_id()) << ") with latest " << "observed Kudu timestamp: " << latest_kudu_ts; lock_guard<mutex> session_lock(session_->lock); session_->kudu_latest_observed_ts = std::max<uint64_t>( session_->kudu_latest_observed_ts, latest_kudu_ts); } } // If the transaction didn't get committed by this point then we should just abort it. if (InTransaction()) { AbortTransaction(); } else if (InKuduTransaction()) { AbortKuduTransaction(); } UpdateEndTime(); { unique_lock<mutex> l(lock_); // Update result set cache metrics, and update mem limit accounting before tearing // down the coordinator. ClearResultCache(); } // Wait until the audit events are flushed. if (wait_thread_.get() != nullptr) { wait_thread_->Join(); wait_thread_.reset(); } else { // The query failed in the fe even before a wait thread is launched. Synchronously // flush log events to audit authorization errors, if any. LogQueryEvents(); } DCHECK(wait_thread_.get() == nullptr); // Update the timeline here so that all of the above work is captured in the timeline. query_events_->MarkEvent("Unregister query"); UnRegisterRemainingRPCs(); } Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) { TResultSet metadata_op_result; // Like the other Exec(), fill out as much profile information as we're able to. summary_profile_->AddInfoString("Query Type", PrintValue(TStmtType::DDL)); RETURN_IF_ERROR(frontend_->ExecHiveServer2MetadataOp(exec_request, &metadata_op_result)); result_metadata_ = metadata_op_result.schema; request_result_set_.reset(new vector<TResultRow>(metadata_op_result.rows)); UpdateNonErrorExecState(ExecState::RUNNING); return Status::OK(); } Status ClientRequestState::WaitAsync() { // TODO: IMPALA-7396: thread creation fault inject is disabled because it is not // handled correctly. return Thread::Create("query-exec-state", "wait-thread", &ClientRequestState::Wait, this, &wait_thread_, false); } bool ClientRequestState::BlockOnWait(int64_t timeout_us, int64_t* block_on_wait_time_us) { DCHECK_GE(timeout_us, 0); unique_lock<mutex> l(lock_); *block_on_wait_time_us = 0; // Some metadata operations like GET_COLUMNS do not rely on WaitAsync() to launch // the wait thread. In such cases this method is expected to be a no-op. if (wait_thread_.get() == nullptr) return true; while (!is_wait_done_) { if (timeout_us == 0) { block_on_wait_cv_.Wait(l); return true; } else { MonotonicStopWatch wait_timeout_timer; wait_timeout_timer.Start(); bool notified = block_on_wait_cv_.WaitFor(l, timeout_us); if (notified) { *block_on_wait_time_us = wait_timeout_timer.ElapsedTime() / NANOS_PER_MICRO; } return notified; } } return true; } void ClientRequestState::Wait() { // block until results are ready Status status = WaitInternal(); // Rows are available now (for SELECT statement), so start the 'wait' timer that tracks // how long Impala waits for the client to fetch rows. For other statements, track the // time until a Close() is received. MarkInactive(); { lock_guard<mutex> l(lock_); if (returns_result_set()) { query_events()->MarkEvent("Rows available"); } else { query_events()->MarkEvent("Request finished"); UpdateEndTime(); } discard_result(UpdateQueryStatus(status)); } if (status.ok()) { if (stmt_type() == TStmtType::DDL) { DCHECK(catalog_op_type() != TCatalogOpType::DDL || request_result_set_ != nullptr); } // It is possible the query already failed at this point and ExecState is ERROR. In // this case, the call to UpdateNonErrorExecState(FINISHED) does not change the // ExecState. UpdateNonErrorExecState(ExecState::FINISHED); } // UpdateQueryStatus() or UpdateNonErrorExecState() have updated exec_state_. DCHECK(exec_state() == ExecState::FINISHED || exec_state() == ExecState::ERROR || retry_state() == RetryState::RETRYING || retry_state() == RetryState::RETRIED); // Notify all the threads blocked on Wait() to finish and then log the query events, // if any. { unique_lock<mutex> l(lock_); is_wait_done_ = true; } block_on_wait_cv_.NotifyAll(); LogQueryEvents(); } Status ClientRequestState::WaitInternal() { // Explain requests have already populated the result set. Nothing to do here. if (exec_request().stmt_type == TStmtType::EXPLAIN) { return Status::OK(); } // Wait until the query has passed through admission control and is either running or // cancelled or encountered an error. if (async_exec_thread_.get() != nullptr) async_exec_thread_->Join(); vector<ChildQuery*> child_queries; Status child_queries_status = child_query_executor_->WaitForAll(&child_queries); { lock_guard<mutex> l(lock_); RETURN_IF_ERROR(query_status_); RETURN_IF_ERROR(UpdateQueryStatus(child_queries_status)); } if (!child_queries.empty()) query_events_->MarkEvent("Child queries finished"); bool isCTAS = catalog_op_type() == TCatalogOpType::DDL && ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT; if (GetCoordinator() != NULL) { Status status = GetCoordinator()->Wait(); if (UNLIKELY(!status.ok())) { if (InKuduTransaction()) AbortKuduTransaction(); return status; } RETURN_IF_ERROR(UpdateCatalog()); } else { // When the coordinator is not available for CTAS that requires a coordinator, check // further if the query has been cancelled. If so, return immediately as there will // be no query result available (IMPALA-11006). if (isCTAS) { lock_guard<mutex> l(lock_); if (is_cancelled_) return Status::CANCELLED; } } if (catalog_op_type() == TCatalogOpType::DDL && ddl_type() == TDdlType::COMPUTE_STATS && child_queries.size() > 0) { RETURN_IF_ERROR(UpdateTableAndColumnStats(child_queries)); } if (!returns_result_set()) { // Queries that do not return a result are finished at this point. This includes // DML operations. eos_.Store(true); } else if (isCTAS) { SetCreateTableAsSelectResultSet(); } return Status::OK(); } Status ClientRequestState::FetchRows(const int32_t max_rows, QueryResultSet* fetched_rows, int64_t block_on_wait_time_us) { // Pause the wait timer, since the client has instructed us to do work on its behalf. MarkActive(); // ImpalaServer::FetchInternal has already taken our lock_ discard_result(UpdateQueryStatus( FetchRowsInternal(max_rows, fetched_rows, block_on_wait_time_us))); MarkInactive(); return query_status_; } Status ClientRequestState::RestartFetch() { // No result caching for this query. Restart is invalid. if (result_cache_max_size_ <= 0) { return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR, "Restarting of fetch requires enabling of query result caching.")); } // The cache overflowed on a previous fetch. if (result_cache_.get() == NULL) { stringstream ss; ss << "The query result cache exceeded its limit of " << result_cache_max_size_ << " rows. Restarting the fetch is not possible."; return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR, ss.str())); } // Reset fetch state to start over. eos_.Store(false); num_rows_fetched_ = 0; return Status::OK(); } void ClientRequestState::UpdateNonErrorExecState(ExecState new_state) { lock_guard<mutex> l(lock_); ExecState old_state = exec_state(); static string error_msg = "Illegal state transition: $0 -> $1, query_id=$2"; switch (new_state) { case ExecState::PENDING: DCHECK(old_state == ExecState::INITIALIZED) << Substitute(error_msg, ExecStateToString(old_state), ExecStateToString(new_state), PrintId(query_id())); UpdateExecState(new_state); break; case ExecState::RUNNING: // It is possible for FinishExecQueryOrDmlRequest() to attempt a transition to // running, even after the query has been cancelled with an error status (and is // thus in the ERROR ExecState). In this case, just ignore the transition attempt. if (old_state != ExecState::ERROR) { // DDL statements and metadata ops don't use the PENDING state, so a query can // transition directly from the INITIALIZED to RUNNING state. DCHECK(old_state == ExecState::INITIALIZED || old_state == ExecState::PENDING) << Substitute(error_msg, ExecStateToString(old_state), ExecStateToString(new_state), PrintId(query_id())); UpdateExecState(new_state); } break; case ExecState::FINISHED: // Only transition to the FINISHED state if the query has not failed. It is not // valid to transition from ERROR to FINISHED, so skip any attempt to do so. if (old_state != ExecState::ERROR) { // A query can transition from PENDING to FINISHED if it is cancelled by the // client. DCHECK(old_state == ExecState::PENDING || old_state == ExecState::RUNNING) << Substitute(error_msg, ExecStateToString(old_state), ExecStateToString(new_state), PrintId(query_id())); UpdateExecState(new_state); } break; default: DCHECK(false) << "A non-error state expected but got: " << ExecStateToString(new_state); } } void ClientRequestState::SetOriginalId(const TUniqueId& original_id) { // Copy the TUniqueId query_id from the original query. original_id_ = make_unique<TUniqueId>(original_id); summary_profile_->AddInfoString("Original Query Id", PrintId(*original_id_)); } void ClientRequestState::MarkAsRetrying(const Status& status) { retry_state_.Store(RetryState::RETRYING); summary_profile_->AddInfoString( RETRY_STATUS_KEY, RetryStateToString(RetryState::RETRYING)); // Set the query status. query_status_ = status; summary_profile_->AddInfoStringRedacted(QUERY_STATUS_KEY, query_status_.GetDetail()); // The Query Status might be overwritten later if the retry fails. "Retry Cause" // preserves the original error that triggered the retry. summary_profile_->AddInfoStringRedacted("Retry Cause", query_status_.GetDetail()); } Status ClientRequestState::UpdateQueryStatus(const Status& status, bool log_error) { // Preserve the first non-ok status if (!status.ok() && query_status_.ok()) { UpdateExecState(ExecState::ERROR); query_status_ = status; summary_profile_->AddInfoStringRedacted(QUERY_STATUS_KEY, query_status_.GetDetail()); if (log_error) VLOG_QUERY << status.GetDetail(); } return status; } Status ClientRequestState::FetchRowsInternal(const int32_t max_rows, QueryResultSet* fetched_rows, int64_t block_on_wait_time_us) { // Wait() guarantees that we've transitioned at least to FINISHED state (and any // state beyond that should have a non-OK query_status_ set). DCHECK(exec_state() == ExecState::FINISHED); if (eos_.Load()) return Status::OK(); if (request_result_set_ != NULL) { int num_rows = 0; const vector<TResultRow>& all_rows = (*(request_result_set_.get())); // max_rows <= 0 means no limit while ((num_rows < max_rows || max_rows <= 0) && num_rows_fetched_ < all_rows.size()) { RETURN_IF_ERROR(fetched_rows->AddOneRow(all_rows[num_rows_fetched_])); ++num_rows_fetched_; ++num_rows; } eos_.Store(num_rows_fetched_ == all_rows.size()); return Status::OK(); } Coordinator* coordinator = GetCoordinator(); if (coordinator == nullptr) { return Status("Client tried to fetch rows on a query that produces no results."); } int32_t num_rows_fetched_from_cache = 0; if (result_cache_max_size_ > 0 && result_cache_ != NULL) { // Satisfy the fetch from the result cache if possible. int cache_fetch_size = (max_rows <= 0) ? result_cache_->size() : max_rows; num_rows_fetched_from_cache = fetched_rows->AddRows(result_cache_.get(), num_rows_fetched_, cache_fetch_size); num_rows_fetched_ += num_rows_fetched_from_cache; COUNTER_ADD(num_rows_fetched_from_cache_counter_, num_rows_fetched_from_cache); if (num_rows_fetched_from_cache >= max_rows) return Status::OK(); } // Maximum number of rows to be fetched from the coord. int32_t max_coord_rows = max_rows; if (max_rows > 0) { DCHECK_LE(num_rows_fetched_from_cache, max_rows); max_coord_rows = max_rows - num_rows_fetched_from_cache; } { SCOPED_TIMER(row_materialization_timer_); size_t before = fetched_rows->size(); bool eos = false; // Temporarily release lock so calls to Cancel() are not blocked. fetch_rows_lock_ // (already held) ensures that we do not call coord_->GetNext() multiple times // concurrently. // TODO: Simplify this. lock_.unlock(); Status status = coordinator->GetNext(fetched_rows, max_coord_rows, &eos, block_on_wait_time_us); lock_.lock(); if (eos) eos_.Store(true); int num_fetched = fetched_rows->size() - before; DCHECK(max_coord_rows <= 0 || num_fetched <= max_coord_rows) << Substitute( "Fetched more rows ($0) than asked for ($1)", num_fetched, max_coord_rows); num_rows_fetched_ += num_fetched; COUNTER_ADD(num_rows_fetched_counter_, num_fetched); RETURN_IF_ERROR(status); // Check if query status has changed during GetNext() call if (!query_status_.ok()) { eos_.Store(true); return query_status_; } } // Update the result cache if necessary. if (result_cache_max_size_ > 0 && result_cache_.get() != NULL) { int rows_fetched_from_coord = fetched_rows->size() - num_rows_fetched_from_cache; if (result_cache_->size() + rows_fetched_from_coord > result_cache_max_size_) { // Set the cache to NULL to indicate that adding the rows fetched from the coord // would exceed the bound of the cache, and therefore, RestartFetch() should fail. ClearResultCache(); return Status::OK(); } // We guess the size of the cache after adding fetched_rows by looking at the size of // fetched_rows itself, and using this estimate to confirm that the memtracker will // allow us to use this much extra memory. In fact, this might be an overestimate, as // the size of two result sets combined into one is not always the size of both result // sets added together (the best example is the null bitset for each column: it might // have only one entry in each result set, and as a result consume two bytes, but when // the result sets are combined, only one byte is needed). Therefore after we add the // new result set into the cache, we need to fix up the memory consumption to the // actual levels to ensure we don't 'leak' bytes that we aren't using. int64_t before = result_cache_->ByteSize(); // Upper-bound on memory required to add fetched_rows to the cache. int64_t delta_bytes = fetched_rows->ByteSize(num_rows_fetched_from_cache, fetched_rows->size()); MemTracker* query_mem_tracker = coordinator->query_mem_tracker(); // Count the cached rows towards the mem limit. if (UNLIKELY(!query_mem_tracker->TryConsume(delta_bytes))) { string details("Failed to allocate memory for result cache."); return query_mem_tracker->MemLimitExceeded(nullptr, details, delta_bytes); } // Append all rows fetched from the coordinator into the cache. int num_rows_added = result_cache_->AddRows( fetched_rows, num_rows_fetched_from_cache, fetched_rows->size()); int64_t after = result_cache_->ByteSize(); // Confirm that this was not an underestimate of the memory required. DCHECK_GE(before + delta_bytes, after) << "Combined result sets consume more memory than both individually " << Substitute("(before: $0, delta_bytes: $1, after: $2)", before, delta_bytes, after); // Fix up the tracked values if (before + delta_bytes > after) { query_mem_tracker->Release(before + delta_bytes - after); delta_bytes = after - before; } // Update result set cache metrics. ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(num_rows_added); ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(delta_bytes); } return Status::OK(); } void ClientRequestState::Cancel(const Status* cause, bool wait_until_finalized) { { lock_guard<mutex> lock(lock_); // If the query has reached a terminal state, no need to update the state. bool already_done = eos_.Load() || exec_state() == ExecState::ERROR; if (!already_done && cause != NULL) { DCHECK(!cause->ok()); discard_result(UpdateQueryStatus(*cause)); query_events_->MarkEvent("Cancelled"); DCHECK(exec_state() == ExecState::ERROR || retry_state() == RetryState::RETRYING); } // To avoid recalling RemoteAdmissionControlClient::CancelAdmission() since it will // send extra RPC. if (!is_cancelled_) { admission_control_client_->CancelAdmission(); is_cancelled_ = true; } } // Release lock_ before doing cancellation work. // Cancel and close child queries before cancelling parent. 'lock_' should not be held // because a) ChildQuery::Cancel() calls back into ImpalaServer and b) cancellation // involves RPCs and can take quite some time. child_query_executor_->Cancel(); // Ensure the parent query is cancelled if execution has started (if the query was not // started, cancellation is handled by the 'async-exec-thread' thread). 'lock_' should // not be held because cancellation involves RPCs and can block for a long time. if (GetCoordinator() != nullptr) GetCoordinator()->Cancel(wait_until_finalized); } Status ClientRequestState::UpdateCatalog() { const TExecRequest& exec_req = exec_request(); if (!exec_req.__isset.query_exec_request || exec_req.query_exec_request.stmt_type != TStmtType::DML) { return Status::OK(); } query_events_->MarkEvent("DML data written"); SCOPED_TIMER(ADD_TIMER(server_profile_, "MetastoreUpdateTimer")); const TQueryExecRequest& query_exec_request = exec_req.query_exec_request; if (query_exec_request.__isset.finalize_params) { const TFinalizeParams& finalize_params = query_exec_request.finalize_params; TUpdateCatalogRequest catalog_update; catalog_update.__set_sync_ddl(exec_req.query_options.sync_ddl); catalog_update.__set_header(GetCatalogServiceRequestHeader()); if (exec_req.query_options.__isset.debug_action) { catalog_update.__set_debug_action(exec_req.query_options.debug_action); } DmlExecState* dml_exec_state = GetCoordinator()->dml_exec_state(); if (!dml_exec_state->PrepareCatalogUpdate(&catalog_update, finalize_params)) { VLOG_QUERY << "No partitions altered, not updating metastore (query id: " << PrintId(query_id()) << ")"; } else { // TODO: We track partitions written to, not created, which means // that we do more work than is necessary, because written-to // partitions don't always require a metastore change. if (VLOG_IS_ON(1)) { vector<string> part_list; for (auto it : catalog_update.updated_partitions) part_list.push_back(it.first); VLOG_QUERY << "Updating metastore with " << catalog_update.updated_partitions.size() << " altered partitions (" << join (part_list, ", ") << ")"; } catalog_update.target_table = finalize_params.table_name; catalog_update.db_name = finalize_params.table_db; catalog_update.is_overwrite = finalize_params.is_overwrite; if (InTransaction()) { catalog_update.__set_transaction_id(finalize_params.transaction_id); catalog_update.__set_write_id(finalize_params.write_id); } if (finalize_params.__isset.iceberg_params) { TIcebergOperationParam& cat_ice_op = catalog_update.iceberg_operation; catalog_update.__isset.iceberg_operation = true; if (!CreateIcebergCatalogOps(finalize_params, &cat_ice_op)) { VLOG_QUERY << "No Iceberg partitions altered, not updating metastore " << "(query id: " << PrintId(query_id()) << ")"; return Status::OK(); } } Status cnxn_status; CatalogServiceConnection client(ExecEnv::GetInstance()->catalogd_client_cache(), *ExecEnv::GetInstance()->GetCatalogdAddress().get(), &cnxn_status); RETURN_IF_ERROR(cnxn_status); VLOG_QUERY << "Executing FinalizeDml() using CatalogService"; TUpdateCatalogResponse resp; Status status = DebugAction(query_options(), "CLIENT_REQUEST_UPDATE_CATALOG"); if (status.ok()) { status = client.DoRpc( &CatalogServiceClientWrapper::UpdateCatalog, catalog_update, &resp); query_events_->MarkEvent("UpdateCatalog finished"); } if (resp.__isset.profile) { for (const TEventSequence& catalog_timeline : resp.profile.event_sequences) { string timeline_name = catalog_timeline.name; // For CTAS, we already have a timeline for the CreateTable execution. // Use another name for the INSERT timeline. if (summary_profile_->GetEventSequence(timeline_name) != nullptr) { timeline_name += " 2"; } summary_profile_->AddEventSequence(timeline_name, catalog_timeline); } } if (status.ok()) status = Status(resp.result.status); if (!status.ok()) { if (InTransaction()) AbortTransaction(); LOG(ERROR) << "ERROR Finalizing DML: " << status.GetDetail(); return status; } if (InTransaction()) { // UpdateCatalog() succeeded and already committed the transaction for us. int64_t txn_id = GetTransactionId(); if (!frontend_->UnregisterTransaction(txn_id).ok()) { LOG(ERROR) << Substitute("Failed to unregister transaction $0", txn_id); } ClearTransactionState(); query_events_->MarkEvent("Transaction committed"); } RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result, exec_req.query_options.sync_ddl, query_options(), query_events_)); } } else if (InKuduTransaction()) { // Commit the Kudu transaction. Clear transaction state if it's successful. // Otherwise, abort the Kudu transaction and clear transaction state. // Note that TQueryExecRequest.finalize_params is not set for inserting rows to Kudu // table. Status status = CommitKuduTransaction(); if (UNLIKELY(!status.ok())) { AbortKuduTransaction(); LOG(ERROR) << "ERROR Finalizing DML: " << status.GetDetail(); return status; } } query_events_->MarkEvent("DML Metastore update finished"); return Status::OK(); } bool ClientRequestState::CreateIcebergCatalogOps( const TFinalizeParams& finalize_params, TIcebergOperationParam* cat_ice_op) { DCHECK(cat_ice_op != nullptr); const TIcebergDmlFinalizeParams& ice_finalize_params = finalize_params.iceberg_params; DmlExecState* dml_exec_state = GetCoordinator()->dml_exec_state(); bool update_catalog = true; cat_ice_op->__set_operation(ice_finalize_params.operation); cat_ice_op->__set_initial_snapshot_id( ice_finalize_params.initial_snapshot_id); cat_ice_op->__set_spec_id(ice_finalize_params.spec_id); if (ice_finalize_params.operation == TIcebergOperation::INSERT) { cat_ice_op->__set_iceberg_data_files_fb( dml_exec_state->CreateIcebergDataFilesVector()); cat_ice_op->__set_is_overwrite(finalize_params.is_overwrite); if (cat_ice_op->iceberg_data_files_fb.empty()) update_catalog = false; } else if (ice_finalize_params.operation == TIcebergOperation::DELETE) { cat_ice_op->__set_iceberg_delete_files_fb( dml_exec_state->CreateIcebergDeleteFilesVector()); cat_ice_op->__set_data_files_referenced_by_position_deletes( dml_exec_state->DataFilesReferencedByPositionDeletes()); if (cat_ice_op->iceberg_delete_files_fb.empty()) update_catalog = false; } else if (ice_finalize_params.operation == TIcebergOperation::UPDATE) { cat_ice_op->__set_iceberg_data_files_fb( dml_exec_state->CreateIcebergDataFilesVector()); cat_ice_op->__set_iceberg_delete_files_fb( dml_exec_state->CreateIcebergDeleteFilesVector()); cat_ice_op->__set_data_files_referenced_by_position_deletes( dml_exec_state->DataFilesReferencedByPositionDeletes()); if (cat_ice_op->iceberg_delete_files_fb.empty()) { DCHECK(cat_ice_op->iceberg_data_files_fb.empty()); update_catalog = false; } } else if (ice_finalize_params.operation == TIcebergOperation::OPTIMIZE) { DCHECK(ice_finalize_params.__isset.optimize_params); const TIcebergOptimizeParams& optimize_params = ice_finalize_params.optimize_params; if (optimize_params.mode == TIcebergOptimizationMode::NOOP) { update_catalog = false; } else { cat_ice_op->__set_iceberg_data_files_fb( dml_exec_state->CreateIcebergDataFilesVector()); if (optimize_params.mode == TIcebergOptimizationMode::PARTIAL) { DCHECK(optimize_params.__isset.selected_data_files_without_deletes); cat_ice_op->__set_replaced_data_files_without_deletes( optimize_params.selected_data_files_without_deletes); } } } else if (ice_finalize_params.operation == TIcebergOperation::MERGE) { cat_ice_op->__set_iceberg_data_files_fb( dml_exec_state->CreateIcebergDataFilesVector()); cat_ice_op->__set_iceberg_delete_files_fb( dml_exec_state->CreateIcebergDeleteFilesVector()); cat_ice_op->__set_data_files_referenced_by_position_deletes( dml_exec_state->DataFilesReferencedByPositionDeletes()); if (cat_ice_op->iceberg_delete_files_fb.empty() && cat_ice_op->iceberg_data_files_fb.empty()) { update_catalog = false; } } if (!update_catalog) query_events_->MarkEvent("No-op Iceberg DML statement"); return update_catalog; } void ClientRequestState::SetResultSet(const TDdlExecResponse* ddl_resp) { if (ddl_resp != NULL && ddl_resp->__isset.result_set) { result_metadata_ = ddl_resp->result_set.schema; request_result_set_.reset(new vector<TResultRow>(ddl_resp->result_set.rows)); } } void ClientRequestState::SetResultSet(const vector<string>& results) { request_result_set_.reset(new vector<TResultRow>); request_result_set_->resize(results.size()); for (int i = 0; i < results.size(); ++i) { (*request_result_set_.get())[i].__isset.colVals = true; (*request_result_set_.get())[i].colVals.resize(1); (*request_result_set_.get())[i].colVals[0].__set_string_val(results[i]); } } void ClientRequestState::SetResultSet(const vector<string>& col1, const vector<string>& col2) { DCHECK_EQ(col1.size(), col2.size()); request_result_set_.reset(new vector<TResultRow>); request_result_set_->resize(col1.size()); for (int i = 0; i < col1.size(); ++i) { (*request_result_set_.get())[i].__isset.colVals = true; (*request_result_set_.get())[i].colVals.resize(2); (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]); (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]); } } void ClientRequestState::SetResultSet(const vector<string>& col1, const vector<string>& col2, const vector<string>& col3) { DCHECK_EQ(col1.size(), col2.size()); DCHECK_EQ(col1.size(), col3.size()); request_result_set_.reset(new vector<TResultRow>); request_result_set_->resize(col1.size()); for (int i = 0; i < col1.size(); ++i) { (*request_result_set_.get())[i].__isset.colVals = true; (*request_result_set_.get())[i].colVals.resize(3); (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]); (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]); (*request_result_set_.get())[i].colVals[2].__set_string_val(col3[i]); } } void ClientRequestState::SetResultSet(const vector<string>& col1, const vector<string>& col2, const vector<string>& col3, const vector<string>& col4) { DCHECK_EQ(col1.size(), col2.size()); DCHECK_EQ(col1.size(), col3.size()); DCHECK_EQ(col1.size(), col4.size()); request_result_set_.reset(new vector<TResultRow>); request_result_set_->resize(col1.size()); for (int i = 0; i < col1.size(); ++i) { (*request_result_set_.get())[i].__isset.colVals = true; (*request_result_set_.get())[i].colVals.resize(4); (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]); (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]); (*request_result_set_.get())[i].colVals[2].__set_string_val(col3[i]); (*request_result_set_.get())[i].colVals[3].__set_string_val(col4[i]); } } void ClientRequestState::SetCreateTableAsSelectResultSet() { DCHECK(ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT); int64_t total_num_rows_inserted = 0; // There will only be rows inserted in the case a new table was created as part of this // operation. if (catalog_op_executor_->ddl_exec_response()->new_table_created) { DCHECK(GetCoordinator()); total_num_rows_inserted = GetCoordinator()->dml_exec_state()->GetNumModifiedRows(); } const string& summary_msg = Substitute("Inserted $0 row(s)", total_num_rows_inserted); VLOG_QUERY << summary_msg; vector<string> results(1, summary_msg); SetResultSet(results); } void ClientRequestState::MarkInactive() { client_wait_sw_.Start(); lock_guard<mutex> l(expiration_data_lock_); last_active_time_ms_ = UnixMillis(); DCHECK(ref_count_ > 0) << "Invalid MarkInactive()"; --ref_count_; } void ClientRequestState::MarkActive() { client_wait_sw_.Stop(); int64_t elapsed_time = client_wait_sw_.ElapsedTime(); // If we have reached eos, then the query is already complete, // and we should not accumulate more client wait time. This mostly // impacts the finalization step, where the client is closing the // query and does not need any more rows. Fetching may have already // completed prior to this point, so finalization time should not // count in that case. If the fetch was incomplete, then the client // time should be counted for finalization as well. if (!eos()) { client_wait_timer_->Set(elapsed_time); // The first call is before any MarkInactive() call has run and produces // a zero-length sample. Skip this zero-length sample (but not any later // zero-length samples). if (elapsed_time != 0 || last_client_wait_time_ != 0) { int64_t current_wait_time = elapsed_time - last_client_wait_time_; client_wait_time_stats_->UpdateCounter(current_wait_time); } last_client_wait_time_ = elapsed_time; } lock_guard<mutex> l(expiration_data_lock_); last_active_time_ms_ = UnixMillis(); ++ref_count_; } std::optional<long> getIcebergSnapshotId(const TExecRequest& exec_req) { DCHECK(exec_req.__isset.catalog_op_request); DCHECK(exec_req.catalog_op_request.__isset.ddl_params); DCHECK(exec_req.catalog_op_request.ddl_params.__isset.compute_stats_params); const TComputeStatsParams& compute_stats_params = exec_req.catalog_op_request.ddl_params.compute_stats_params; if (compute_stats_params.__isset.iceberg_snapshot_id) { return std::optional<long>(compute_stats_params.iceberg_snapshot_id); } else { return {}; } } Status ClientRequestState::UpdateTableAndColumnStats( const vector<ChildQuery*>& child_queries) { DCHECK_GE(child_queries.size(), 1); DCHECK_LE(child_queries.size(), 2); catalog_op_executor_.reset( new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_)); // If there was no column stats query, pass in empty thrift structures to // ExecComputeStats(). Otherwise pass in the column stats result. TTableSchema col_stats_schema; TRowSet col_stats_data; if (child_queries.size() > 1) { col_stats_schema = child_queries[1]->result_schema(); col_stats_data = child_queries[1]->result_data(); } const TExecRequest& exec_req = exec_request(); std::optional<long> snapshot_id = getIcebergSnapshotId(exec_req); Status status = catalog_op_executor_->ExecComputeStats( GetCatalogServiceRequestHeader(), exec_req.catalog_op_request, child_queries[0]->result_schema(), child_queries[0]->result_data(), col_stats_schema, col_stats_data, snapshot_id); AddCatalogTimeline(); { lock_guard<mutex> l(lock_); RETURN_IF_ERROR(UpdateQueryStatus(status)); } RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult( *catalog_op_executor_->update_catalog_result(), exec_req.query_options.sync_ddl, query_options(), query_events_)); // Set the results to be reported to the client. SetResultSet(catalog_op_executor_->ddl_exec_response()); query_events_->MarkEvent("Metastore update finished"); return Status::OK(); } void ClientRequestState::ClearResultCache() { if (result_cache_ == nullptr) return; // Update result set cache metrics and mem limit accounting. ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(-result_cache_->size()); int64_t total_bytes = result_cache_->ByteSize(); ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(-total_bytes); Coordinator* coordinator = GetCoordinator(); if (coordinator != nullptr) { DCHECK(coordinator->query_mem_tracker() != nullptr); coordinator->query_mem_tracker()->Release(total_bytes); } result_cache_.reset(); } void ClientRequestState::UpdateExecState(ExecState exec_state) { { lock_guard<mutex> l(exec_state_lock_); exec_state_.Store(exec_state); summary_profile_->AddInfoString("Query State", PrintValue(BeeswaxQueryState())); summary_profile_->AddInfoString("Impala Query State", ExecStateToString(exec_state)); } // Drop exec_state_lock_ before signalling exec_state_cv_.NotifyAll(); } void ClientRequestState::WaitForCompletionExecState() { if (query_options().long_polling_time_ms <= 0) return; int64_t timeout_us = query_options().long_polling_time_ms * MICROS_PER_MILLI; unique_lock<mutex> l(exec_state_lock_); timespec deadline; TimeFromNowMicros(timeout_us, &deadline); bool timed_out = false; while (exec_state() != ExecState::FINISHED && exec_state() != ExecState::ERROR && !timed_out) { timed_out = !exec_state_cv_.WaitUntil(l, deadline); } } TOperationState::type ClientRequestState::TOperationState() const { switch (exec_state()) { case ExecState::INITIALIZED: return TOperationState::INITIALIZED_STATE; case ExecState::PENDING: return TOperationState::PENDING_STATE; case ExecState::RUNNING: return TOperationState::RUNNING_STATE; case ExecState::FINISHED: return TOperationState::FINISHED_STATE; case ExecState::ERROR: return TOperationState::ERROR_STATE; default: { DCHECK(false) << "Add explicit translation for all used ExecState values"; return TOperationState::ERROR_STATE; } } } beeswax::QueryState::type ClientRequestState::BeeswaxQueryState() const { switch (exec_state()) { case ExecState::INITIALIZED: return beeswax::QueryState::CREATED; case ExecState::PENDING: return beeswax::QueryState::COMPILED; case ExecState::RUNNING: return beeswax::QueryState::RUNNING; case ExecState::FINISHED: return beeswax::QueryState::FINISHED; case ExecState::ERROR: return beeswax::QueryState::EXCEPTION; default: { DCHECK(false) << "Add explicit translation for all used ExecState values"; return beeswax::QueryState::EXCEPTION; } } } // It is safe to use 'coord_' directly for the following two methods since they are safe // to call concurrently with Coordinator::Exec(). See comments for 'coord_' and // 'coord_exec_called_' for more details. Status ClientRequestState::UpdateBackendExecStatus( const ReportExecStatusRequestPB& request, const TRuntimeProfileForest& thrift_profiles) { DCHECK(coord_.get()); return coord_->UpdateBackendExecStatus(request, thrift_profiles); } void ClientRequestState::UpdateFilter( const UpdateFilterParamsPB& params, RpcContext* context) { DCHECK(coord_.get()); coord_->UpdateFilter(params, context); } bool ClientRequestState::GetDmlStats(TDmlResult* dml_result, Status* query_status) { lock_guard<mutex> l(lock_); *query_status = query_status_; if (!query_status->ok()) return false; // Coord may be NULL for a SELECT with LIMIT 0. // Note that when IMPALA-87 is fixed (INSERT without FROM clause) we might // need to revisit this, since that might lead us to insert a row without a // coordinator, depending on how we choose to drive the table sink. Coordinator* coord = GetCoordinator(); if (coord == nullptr) return false; coord->dml_exec_state()->ToTDmlResult(dml_result); return true; } void ClientRequestState::WaitUntilRetried() { unique_lock<mutex> l(lock_); DCHECK(retry_state() != RetryState::NOT_RETRIED); while (retry_state() == RetryState::RETRYING) { block_until_retried_cv_.Wait(l); } DCHECK(retry_state() == RetryState::RETRIED || exec_state() == ExecState::ERROR); } void ClientRequestState::MarkAsRetried(const TUniqueId& retried_id) { DCHECK(retry_state() == RetryState::RETRYING) << Substitute("Illegal retry state transition: $0 -> RETRYING, query_id=$2", RetryStateToString(retry_state()), PrintId(query_id())); retry_state_.Store(RetryState::RETRIED); summary_profile_->AddInfoStringRedacted( RETRY_STATUS_KEY, RetryStateToString(RetryState::RETRIED)); summary_profile_->AddInfoString("Retried Query Id", PrintId(retried_id)); UpdateExecState(ExecState::ERROR); block_until_retried_cv_.NotifyOne(); retried_id_ = make_unique<TUniqueId>(retried_id); } const string& ClientRequestState::effective_user() const { return GetEffectiveUser(query_ctx_.session); } void ClientRequestState::UpdateEndTime() { // Update the query's end time only if it isn't set previously. if (end_time_us_.CompareAndSwap(0, UnixMicros())) { // Certain API clients expect Start Time and End Time to be date-time strings // of nanosecond precision, so we explicitly specify the precision here. summary_profile_->AddInfoString( "End Time", ToStringFromUnixMicros(end_time_us(), TimePrecision::Nanosecond)); int64_t duration = end_time_us() - start_time_us(); summary_profile_->AddInfoString("Duration", Substitute("$0 ($1 us)", PrettyPrinter::Print(duration, TUnit::TIME_US), duration)); } } int64_t ClientRequestState::GetTransactionId() const { DCHECK(InTransaction()); return exec_request().query_exec_request.finalize_params.transaction_id; } bool ClientRequestState::InTransaction() const { return exec_request().query_exec_request.finalize_params.__isset.transaction_id && !transaction_closed_; } void ClientRequestState::AbortTransaction() { DCHECK(InTransaction()); if (frontend_->AbortTransaction(GetTransactionId()).ok()) { query_events_->MarkEvent("Transaction aborted"); } else { VLOG(1) << Substitute("Unable to abort transaction with id: $0", GetTransactionId()); } ClearTransactionState(); } void ClientRequestState::ClearTransactionState() { DCHECK(InTransaction()); transaction_closed_ = true; } bool ClientRequestState::InKuduTransaction() const { // If Kudu transaction is opened, TQueryExecRequest.query_ctx.is_kudu_transactional // is set as true by Frontend.doCreateExecRequest(). return (exec_request().query_exec_request.query_ctx.is_kudu_transactional && !transaction_closed_); } void ClientRequestState::AbortKuduTransaction() { DCHECK(InKuduTransaction()); if (frontend_->AbortKuduTransaction(query_ctx_.query_id).ok()) { query_events_->MarkEvent("Kudu transaction aborted"); } else { VLOG(1) << Substitute("Unable to abort Kudu transaction with query-id: $0", PrintId(query_ctx_.query_id)); } transaction_closed_ = true; } Status ClientRequestState::CommitKuduTransaction() { DCHECK(InKuduTransaction()); // Skip calling Commit() for Kudu Transaction with a debug action so that test code // could explicitly control over calling Commit(). Status status = DebugAction(exec_request().query_options, "CRS_NOT_COMMIT_KUDU_TXN"); if (UNLIKELY(!status.ok())) { VLOG(1) << Substitute("Skip to commit Kudu transaction with query-id: $0", PrintId(query_ctx_.query_id)); transaction_closed_ = true; return Status::OK(); } status = frontend_->CommitKuduTransaction(query_ctx_.query_id); if (status.ok()) { query_events_->MarkEvent("Kudu transaction committed"); transaction_closed_ = true; } else { VLOG(1) << Substitute("Unable to commit Kudu transaction with query-id: $0", PrintId(query_ctx_.query_id)); } return status; } void ClientRequestState::LogQueryEvents() { // Wait until the results are available. This guarantees the completion of non QUERY // statements like DDL/DML etc. Query events are logged if the query reaches a FINISHED // state. For certain query types, events are logged regardless of the query state. Status status; { lock_guard<mutex> l(lock_); status = query_status(); } bool log_events = true; switch (stmt_type()) { case TStmtType::QUERY: case TStmtType::DML: case TStmtType::DDL: case TStmtType::UNKNOWN: log_events = status.ok(); break; case TStmtType::EXPLAIN: case TStmtType::LOAD: case TStmtType::SET: case TStmtType::ADMIN_FN: default: break; } // Log audit events that are due to an AuthorizationException. if (parent_server_->IsAuditEventLoggingEnabled() && (Frontend::IsAuthorizationError(status) || log_events)) { // TODO: deal with an error status discard_result(LogAuditRecord(status)); } if (log_events && (parent_server_->AreQueryHooksEnabled() || parent_server_->IsLineageLoggingEnabled())) { // TODO: deal with an error status discard_result(LogLineageRecord()); } } Status ClientRequestState::LogAuditRecord(const Status& query_status) { const TExecRequest& request = exec_request(); stringstream ss; rapidjson::StringBuffer buffer; rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); writer.StartObject(); // Each log entry is a timestamp mapped to a JSON object ss << UnixMillis(); writer.String(ss.str().c_str()); writer.StartObject(); writer.String("query_id"); writer.String(PrintId(query_id()).c_str()); writer.String("session_id"); writer.String(PrintId(session_id()).c_str()); writer.String("start_time"); writer.String(ToStringFromUnixMicros(start_time_us()).c_str()); writer.String("authorization_failure"); writer.Bool(Frontend::IsAuthorizationError(query_status)); writer.String("status"); writer.String(query_status.GetDetail().c_str()); writer.String("user"); writer.String(effective_user().c_str()); writer.String("impersonator"); if (do_as_user().empty()) { // If there is no do_as_user() is empty, the "impersonator" field should be Null. writer.Null(); } else { // Otherwise, the delegator is the current connected user. writer.String(connected_user().c_str()); } writer.String("statement_type"); if (request.stmt_type == TStmtType::DDL) { if (request.catalog_op_request.op_type == TCatalogOpType::DDL) { writer.String(PrintValue(request.catalog_op_request.ddl_params.ddl_type).c_str()); } else { writer.String(PrintValue(request.catalog_op_request.op_type).c_str()); } } else { writer.String(PrintValue(request.stmt_type).c_str()); } writer.String("network_address"); writer.String(TNetworkAddressToString( session()->network_address).c_str()); writer.String("sql_statement"); string stmt = replace_all_copy(sql_stmt(), "\n", " "); Redact(&stmt); writer.String(stmt.c_str()); writer.String("catalog_objects"); writer.StartArray(); for (const TAccessEvent& event: request.access_events) { writer.StartObject(); writer.String("name"); writer.String(event.name.c_str()); writer.String("object_type"); writer.String(PrintValue(event.object_type).c_str()); writer.String("privilege"); writer.String(event.privilege.c_str()); writer.EndObject(); } writer.EndArray(); writer.EndObject(); writer.EndObject(); Status status = parent_server_->AppendAuditEntry(buffer.GetString()); if (!status.ok()) { LOG(ERROR) << "Unable to record audit event record: " << status.GetDetail(); if (FLAGS_abort_on_failed_audit_event) { CLEAN_EXIT_WITH_ERROR("Shutting down Impala Server due to " "abort_on_failed_audit_event=true"); } } return status; } Status ClientRequestState::LogLineageRecord() { const TExecRequest& request = exec_request(); if (request.stmt_type == TStmtType::EXPLAIN || (!request.__isset.query_exec_request && !request.__isset.catalog_op_request)) { return Status::OK(); } TLineageGraph lineage_graph; if (request.__isset.query_exec_request && request.query_exec_request.__isset.lineage_graph) { lineage_graph = request.query_exec_request.lineage_graph; } else if (request.__isset.catalog_op_request && request.catalog_op_request.__isset.lineage_graph) { lineage_graph = request.catalog_op_request.lineage_graph; } else { return Status::OK(); } if (catalog_op_executor_ != nullptr && catalog_op_type() == TCatalogOpType::DDL) { const TDdlExecResponse* response = ddl_exec_response(); //Set table location in the lineage graph. Currently, this is only set for external // tables in frontend. if (response->__isset.table_location) { lineage_graph.__set_table_location(response->table_location); } // Update vertices that have -1 table_create_time for a newly created table/view. if (response->__isset.table_name && response->__isset.table_create_time) { for (auto &vertex: lineage_graph.vertices) { if (!vertex.__isset.metadata) continue; if (vertex.metadata.table_name == response->table_name && vertex.metadata.table_create_time == -1) { vertex.metadata.__set_table_create_time(response->table_create_time); } } } } // Set the query end time in TLineageGraph. Must use UNIX time directly rather than // e.g. converting from end_time() (IMPALA-4440). lineage_graph.__set_ended(UnixMillis() / 1000); string lineage_record; LineageUtil::TLineageToJSON(lineage_graph, &lineage_record); if (parent_server_->AreQueryHooksEnabled()) { // invoke QueryEventHooks TQueryCompleteContext query_complete_context; query_complete_context.__set_lineage_string(lineage_record); const Status& status = ExecEnv::GetInstance()->frontend()->CallQueryCompleteHooks( query_complete_context); if (!status.ok()) { LOG(ERROR) << "Failed to send query lineage info to FE CallQueryCompleteHooks" << status.GetDetail(); if (FLAGS_abort_on_failed_lineage_event) { CLEAN_EXIT_WITH_ERROR("Shutting down Impala Server due to " "abort_on_failed_lineage_event=true"); } } } // lineage logfile writing is deprecated in favor of the // QueryEventHooks (see FE). this behavior is being retained // for now but may be removed in the future. if (parent_server_->IsLineageLoggingEnabled()) { const Status& status = parent_server_->AppendLineageEntry(lineage_record); if (!status.ok()) { LOG(ERROR) << "Unable to record query lineage record: " << status.GetDetail(); if (FLAGS_abort_on_failed_lineage_event) { CLEAN_EXIT_WITH_ERROR("Shutting down Impala Server due to " "abort_on_failed_lineage_event=true"); } } return status; } return Status::OK(); } string ClientRequestState::ExecStateToString(ExecState state) { static const unordered_map<ClientRequestState::ExecState, const char*> exec_state_strings{{ClientRequestState::ExecState::INITIALIZED, "INITIALIZED"}, {ClientRequestState::ExecState::PENDING, "PENDING"}, {ClientRequestState::ExecState::RUNNING, "RUNNING"}, {ClientRequestState::ExecState::FINISHED, "FINISHED"}, {ClientRequestState::ExecState::ERROR, "ERROR"}}; return exec_state_strings.at(state); } string ClientRequestState::RetryStateToString(RetryState state) { static const unordered_map<ClientRequestState::RetryState, const char*> retry_state_strings{{ClientRequestState::RetryState::NOT_RETRIED, "NOT_RETRIED"}, {ClientRequestState::RetryState::RETRYING, "RETRYING"}, {ClientRequestState::RetryState::RETRIED, "RETRIED"}}; return retry_state_strings.at(state); } TCatalogServiceRequestHeader ClientRequestState::GetCatalogServiceRequestHeader() { TCatalogServiceRequestHeader header = TCatalogServiceRequestHeader(); header.__set_requesting_user(effective_user()); header.__set_client_ip(session()->network_address.hostname); header.__set_want_minimal_response(FLAGS_use_local_catalog); header.__set_redacted_sql_stmt( query_ctx_.client_request.__isset.redacted_stmt ? query_ctx_.client_request.redacted_stmt : query_ctx_.client_request.stmt); header.__set_query_id(query_ctx_.query_id); header.__set_coordinator_hostname(FLAGS_hostname); return header; } void ClientRequestState::RegisterRPC() { RpcEventHandler::InvocationContext* rpc_context = RpcEventHandler::GetThreadRPCContext(); // The existence of rpc_context means that this is called from an RPC if (rpc_context) { lock_guard<mutex> l(lock_); if (track_rpcs_ && pending_rpcs_.find(rpc_context) == pending_rpcs_.end()) { rpc_context->Register(); pending_rpcs_.insert(rpc_context); rpc_count_->Add(1); } } } void ClientRequestState::UnRegisterCompletedRPCs() { lock_guard<mutex> l(lock_); for (auto iter = pending_rpcs_.begin(); iter != pending_rpcs_.end();) { RpcEventHandler::InvocationContext* rpc_context = *iter; uint64_t read_ns = 0, write_ns = 0; if (rpc_context->UnRegisterCompleted(read_ns, write_ns)) { rpc_read_timer_->Add(read_ns); rpc_write_timer_->Add(write_ns); iter = pending_rpcs_.erase(iter); } else { ++iter; } } } void ClientRequestState::UnRegisterRemainingRPCs() { lock_guard<mutex> l(lock_); for (auto rpc_context: pending_rpcs_) { rpc_context->UnRegister(); } track_rpcs_ = false; pending_rpcs_.clear(); } void ClientRequestState::CopyRPCs(ClientRequestState& from_request) { lock_guard<mutex> l_to(lock_); lock_guard<mutex> l_from(from_request.lock_); rpc_read_timer_->Add(from_request.rpc_read_timer_->value()); rpc_write_timer_->Add(from_request.rpc_write_timer_->value()); rpc_count_->Add(from_request.rpc_count_->value()); for (auto rpc_context: from_request.pending_rpcs_) { rpc_context->Register(); pending_rpcs_.insert(rpc_context); } } Status ClientRequestState::ExecMigrateRequest() { ExecMigrateRequestImpl(); SetResultSet({"Table has been migrated."}); return query_status_; } void ClientRequestState::ExecMigrateRequestImpl() { // A convert table request holds the query strings for the sub-queries. These are // populated by ConvertTableToIcebergStmt in the Frontend during analysis. const TConvertTableRequest& params = exec_request().convert_table_request; { RuntimeProfile* child_profile = RuntimeProfile::Create(&profile_pool_, "Child Queries 1"); profile_->AddChild(child_profile); vector<ChildQuery> child_queries; // Prepare: SET some table properties for the original table. RuntimeProfile* set_hdfs_table_profile = RuntimeProfile::Create( &profile_pool_, "Set properties for HDFS table query"); child_profile->AddChild(set_hdfs_table_profile); child_queries.emplace_back(params.set_hdfs_table_properties_query, this, parent_server_, set_hdfs_table_profile, &profile_pool_); // Prepare: RENAME the HDFS table to a temporary HDFS table. RuntimeProfile* rename_hdfs_table_profile = RuntimeProfile::Create( &profile_pool_, "Rename HDFS table query"); child_profile->AddChild(rename_hdfs_table_profile); child_queries.emplace_back(params.rename_hdfs_table_to_temporary_query, this, parent_server_, rename_hdfs_table_profile, &profile_pool_); // Prepare: REFRESH the temporary HDFS table. RuntimeProfile* refresh_hdfs_table_profile = RuntimeProfile::Create( &profile_pool_, "Refresh temporary HDFS table query"); child_profile->AddChild(refresh_hdfs_table_profile); child_queries.emplace_back(params.refresh_temporary_hdfs_table_query, this, parent_server_, refresh_hdfs_table_profile, &profile_pool_); // Execute child queries unique_ptr<ChildQueryExecutor> query_executor(new ChildQueryExecutor()); RETURN_VOID_IF_ERROR(query_executor->ExecAsync(move(child_queries))); vector<ChildQuery*>* completed_queries = new vector<ChildQuery*>(); Status query_status = query_executor->WaitForAll(completed_queries); if (!query_status.ok()) AddTableResetHints(params, &query_status); { lock_guard<mutex> l(lock_); RETURN_VOID_IF_ERROR(UpdateQueryStatus(query_status)); } } // Create an external Iceberg table using the data of the HDFS table. Status status = frontend_->Convert(exec_request()); if (!status.ok()) AddTableResetHints(params, &status); { lock_guard<mutex> l(lock_); RETURN_VOID_IF_ERROR(UpdateQueryStatus(status)); } { RuntimeProfile* child_profile = RuntimeProfile::Create(&profile_pool_, "Child Queries 2"); profile_->AddChild(child_profile); vector<ChildQuery> child_queries; if (params.__isset.create_iceberg_table_query) { // Prepare: CREATE the Iceberg table that inherits HDFS table location. RuntimeProfile* create_iceberg_table_profile = RuntimeProfile::Create( &profile_pool_, "Create Iceberg table query"); child_profile->AddChild(create_iceberg_table_profile); child_queries.emplace_back(params.create_iceberg_table_query, this, parent_server_, create_iceberg_table_profile, &profile_pool_); } else { // Prepare: Invalidate metadata for tables in Hive catalog to guarantee that it is // propagated instantly to all coordinators. RuntimeProfile* invalidate_metadata_profile = RuntimeProfile::Create( &profile_pool_, "Invalidate metadata Iceberg table query"); child_profile->AddChild(invalidate_metadata_profile); child_queries.emplace_back(params.invalidate_metadata_query, this, parent_server_, invalidate_metadata_profile, &profile_pool_); } if (params.__isset.post_create_alter_table_query) { // Prepare: ALTER TABLE query after creating the Iceberg table. RuntimeProfile* post_create_alter_table_profile = RuntimeProfile::Create( &profile_pool_, "ALTER TABLE after create Iceberg table query"); child_profile->AddChild(post_create_alter_table_profile); child_queries.emplace_back(params.post_create_alter_table_query, this, parent_server_, post_create_alter_table_profile, &profile_pool_); } // Prepare: DROP the temporary HDFS table. RuntimeProfile* drop_tmp_hdfs_table_profile = RuntimeProfile::Create( &profile_pool_, "Drop temporary HDFS table query"); child_profile->AddChild(drop_tmp_hdfs_table_profile); child_queries.emplace_back(params.drop_temporary_hdfs_table_query, this, parent_server_, drop_tmp_hdfs_table_profile, &profile_pool_); // Execute queries unique_ptr<ChildQueryExecutor> query_executor(new ChildQueryExecutor()); RETURN_VOID_IF_ERROR(query_executor->ExecAsync(move(child_queries))); vector<ChildQuery*>* completed_queries = new vector<ChildQuery*>(); Status query_status = query_executor->WaitForAll(completed_queries); { lock_guard<mutex> l(lock_); RETURN_VOID_IF_ERROR(UpdateQueryStatus(query_status)); } } } Status ClientRequestState::TryKillQueryLocally( const TUniqueId& query_id, const string& requesting_user, bool is_admin) { Status status = ExecEnv::GetInstance()->impala_server()->KillQuery( query_id, requesting_user, is_admin); if (status.ok()) { SetResultSet({Substitute("Query $0 is killed.", PrintId(query_id))}); return query_status_; } return status; } Status ClientRequestState::TryKillQueryRemotely( const TUniqueId& query_id, const KillQueryRequestPB& request) { // The initial status should be INVALID_QUERY_HANDLE so that if there is no other // coordinator in the cluster, it will be the status to return. Status status = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id)); ExecutorGroup all_coordinators = ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot()->all_coordinators; // Skipping the current impalad. unique_ptr<ExecutorGroup> other_coordinators{ExecutorGroup::GetFilteredExecutorGroup( &all_coordinators, {ExecEnv::GetInstance()->krpc_address()})}; // If we get an RPC error, instead of returning immediately, we record it and move // on to the next coordinator. Status rpc_errors = Status::OK(); for (const auto& backend : other_coordinators->GetAllExecutorDescriptors()) { // The logic here is similar to ExecShutdownRequest() NetworkAddressPB krpc_addr = MakeNetworkAddressPB(backend.ip_address(), backend.address().port(), backend.backend_id(), ExecEnv::GetInstance()->rpc_mgr()->GetUdsAddressUniqueId()); VLOG_QUERY << "Sending KillQuery() RPC to " << NetworkAddressPBToString(krpc_addr); unique_ptr<ControlServiceProxy> proxy; Status get_proxy_status = ControlService::GetProxy(krpc_addr, backend.address().hostname(), &proxy); if (!get_proxy_status.ok()) { Status get_proxy_status_to_report{Substitute( "KillQuery: Could not get Proxy to ControlService at $0 with error: $1.", NetworkAddressPBToString(krpc_addr), get_proxy_status.msg().msg())}; rpc_errors.MergeStatus(get_proxy_status_to_report); LOG(ERROR) << get_proxy_status_to_report.GetDetail(); continue; } KillQueryResponsePB response; const int num_retries = 3; const int64_t timeout_ms = 10 * MILLIS_PER_SEC; const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC; // Currently, a KILL QUERY statement is not interruptible. Status rpc_status = RpcMgr::DoRpcWithRetry(proxy, &ControlServiceProxy::KillQuery, request, &response, query_ctx_, "KillQuery() RPC failed", num_retries, timeout_ms, backoff_time_ms, "CRS_KILL_QUERY_RPC"); if (!rpc_status.ok()) { LOG(ERROR) << rpc_status.GetDetail(); rpc_errors.MergeStatus(rpc_status); continue; } // Currently, we only support killing one query in one KILL QUERY statement. DCHECK_EQ(response.statuses_size(), 1); status = Status(response.statuses(0)); if (status.ok()) { // Kill succeeded. VLOG_QUERY << "KillQuery: Found the coordinator at " << NetworkAddressPBToString(krpc_addr); SetResultSet({Substitute("Query $0 is killed.", PrintId(query_id))}); return query_status_; } else if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) { LOG(ERROR) << "KillQuery: Found the coordinator at " << NetworkAddressPBToString(krpc_addr) << " but failed to kill the query: " << status.GetDetail(); // Kill failed, but we found the coordinator of the query. return status; } } // We did't find the coordinator of the query after trying all other coordinators. // If there is any RPC error, return it. if (!rpc_errors.ok()) { return rpc_errors; } // If there is no RPC error, return INVALID_QUERY_HANDLE. return status; } Status ClientRequestState::ExecKillQueryRequest() { TUniqueId query_id = exec_request().kill_query_request.query_id; string requesting_user = exec_request().kill_query_request.requesting_user; bool is_admin = exec_request().kill_query_request.is_admin; VLOG_QUERY << "Exec KillQuery: query_id=" << PrintId(query_id) << ", requesting_user=" << requesting_user << ", is_admin=" << is_admin; // First try cancelling the query locally. Status status = TryKillQueryLocally(query_id, requesting_user, is_admin); if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) { return status; } // The current impalad is NOT the coordinator of the query. Now we have to broadcast // the kill request to all other coordinators. UniqueIdPB query_id_pb; TUniqueIdToUniqueIdPB(query_id, &query_id_pb); KillQueryRequestPB request; *request.add_query_ids() = query_id_pb; *request.mutable_requesting_user() = requesting_user; request.set_is_admin(is_admin); status = TryKillQueryRemotely(query_id, request); if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) { return status; } // All the error messages are "Invalid or unknown query handle". return Status("Could not find query on any coordinator."); } void ClientRequestState::AddTableResetHints(const TConvertTableRequest& params, Status* status) const { string table_reset_hint("Your table might have been renamed. To reset the name " "try running:\n" + params.reset_table_name_query + ";"); status->MergeStatus(Status(table_reset_hint)); } int64_t ClientRequestState::num_rows_fetched_counter() const { if (LIKELY(num_rows_fetched_counter_ != nullptr)) { return num_rows_fetched_counter_->value(); } return 0; } int64_t ClientRequestState::row_materialization_rate() const { if (LIKELY(row_materialization_rate_ != nullptr)) { return row_materialization_rate_->value(); } return 0; } int64_t ClientRequestState::row_materialization_timer() const { if (LIKELY(row_materialization_timer_ != nullptr)) { return row_materialization_timer_->value(); } return 0; } void ClientRequestState::AddCatalogTimeline() { if (catalog_op_executor_ != nullptr && catalog_op_executor_->catalog_profile() != nullptr) { for (const TEventSequence& catalog_timeline : catalog_op_executor_->catalog_profile()->event_sequences) { summary_profile_->AddEventSequence(catalog_timeline.name, catalog_timeline); } } } }