in be/src/exec/system-table-scanner.cc [175:429]
Status QueryScanner::MaterializeNextTuple(
MemPool* pool, Tuple* tuple, const TupleDescriptor* tuple_desc) {
using impala::workloadmgmt::IncludeField;
DCHECK(!query_records_.empty());
const QueryStateExpanded& query = *query_records_.front();
const QueryStateRecord& record = *query.base_state;
ExecEnv* exec_env = ExecEnv::GetInstance();
// Verify there are no clustering columns (partitions) to offset col_pos.
DCHECK_EQ(0, tuple_desc->table_desc()->num_clustering_cols());
for (const SlotDescriptor* slot_desc : tuple_desc->slots()) {
void* slot = tuple->GetSlot(slot_desc->tuple_offset());
switch (slot_desc->col_pos()) {
case TQueryTableColumn::CLUSTER_ID:
RETURN_IF_ERROR(WriteStringSlot(FLAGS_cluster_id, pool, slot));
break;
case TQueryTableColumn::QUERY_ID:
RETURN_IF_ERROR(WriteStringSlot(PrintId(record.id), pool, slot));
break;
case TQueryTableColumn::SESSION_ID:
RETURN_IF_ERROR(WriteStringSlot(PrintId(query.session_id), pool, slot));
break;
case TQueryTableColumn::SESSION_TYPE:
RETURN_IF_ERROR(WriteStringSlot(to_string(query.session_type), pool, slot));
break;
case TQueryTableColumn::HIVESERVER2_PROTOCOL_VERSION:
if (query.session_type == TSessionType::HIVESERVER2) {
RETURN_IF_ERROR(WriteStringSlot(
query.hiveserver2_protocol_version_formatted(), pool, slot));
}
break;
case TQueryTableColumn::DB_USER:
RETURN_IF_ERROR(WriteStringSlot(record.effective_user, pool, slot));
break;
case TQueryTableColumn::DB_USER_CONNECTION:
RETURN_IF_ERROR(WriteStringSlot(query.db_user_connection, pool, slot));
break;
case TQueryTableColumn::DB_NAME:
RETURN_IF_ERROR(WriteStringSlot(record.default_db, pool, slot));
break;
case TQueryTableColumn::IMPALA_COORDINATOR:
RETURN_IF_ERROR(WriteStringSlot(
TNetworkAddressToString(exec_env->configured_backend_address()), pool, slot));
break;
case TQueryTableColumn::QUERY_STATUS:
RETURN_IF_ERROR(WriteStringSlot(record.query_status.ok() ?
"OK" : record.query_status.msg().msg(), pool, slot));
break;
case TQueryTableColumn::QUERY_STATE:
RETURN_IF_ERROR(WriteStringSlot(record.query_state, pool, slot));
break;
case TQueryTableColumn::IMPALA_QUERY_END_STATE:
RETURN_IF_ERROR(WriteStringSlot(query.impala_query_end_state, pool, slot));
break;
case TQueryTableColumn::QUERY_TYPE:
RETURN_IF_ERROR(WriteStringSlot(to_string(record.stmt_type), pool, slot));
break;
case TQueryTableColumn::NETWORK_ADDRESS:
RETURN_IF_ERROR(WriteStringSlot(
TNetworkAddressToString(query.client_address), pool, slot));
break;
case TQueryTableColumn::START_TIME_UTC:
WriteUnixTimestampSlot(record.start_time_us, slot);
break;
case TQueryTableColumn::TOTAL_TIME_MS: {
const int64_t end_time_us =
record.end_time_us > 0 ? record.end_time_us : UnixMicros();
double duration_us = (end_time_us - record.start_time_us) / MICROS_TO_MILLIS;
WriteDecimalSlot(slot_desc->type(), duration_us, slot);
break;
}
case TQueryTableColumn::QUERY_OPTS_CONFIG:
RETURN_IF_ERROR(WriteStringSlot(
DebugQueryOptions(query.query_options), pool, slot));
break;
case TQueryTableColumn::RESOURCE_POOL:
RETURN_IF_ERROR(WriteStringSlot(record.resource_pool, pool, slot));
break;
case TQueryTableColumn::PER_HOST_MEM_ESTIMATE:
WriteBigIntSlot(query.per_host_mem_estimate, slot);
break;
case TQueryTableColumn::DEDICATED_COORD_MEM_ESTIMATE:
WriteBigIntSlot(query.dedicated_coord_mem_estimate, slot);
break;
case TQueryTableColumn::PER_HOST_FRAGMENT_INSTANCES:
if (!query.per_host_state.empty()) {
stringstream ss;
for (const auto& state : query.per_host_state) {
ss << TNetworkAddressToString(state.first) << "="
<< state.second.fragment_instance_count << ",";
}
string s = ss.str();
s.pop_back();
RETURN_IF_ERROR(WriteStringSlot(s, pool, slot));
}
break;
case TQueryTableColumn::BACKENDS_COUNT:
DCHECK_LE(query.per_host_state.size(), numeric_limits<int32_t>::max());
WriteIntSlot(query.per_host_state.size(), slot);
break;
case TQueryTableColumn::ADMISSION_RESULT:
RETURN_IF_ERROR(WriteStringSlot(query.admission_result, pool, slot));
break;
case TQueryTableColumn::CLUSTER_MEMORY_ADMITTED:
WriteBigIntSlot(record.cluster_mem_est, slot);
break;
case TQueryTableColumn::EXECUTOR_GROUP:
RETURN_IF_ERROR(WriteStringSlot(query.executor_group, pool, slot));
break;
case TQueryTableColumn::EXECUTOR_GROUPS:
RETURN_IF_ERROR(WriteStringSlot(query.executor_groups, pool, slot));
break;
case TQueryTableColumn::EXEC_SUMMARY:
RETURN_IF_ERROR(WriteStringSlot(query.exec_summary, pool, slot));
break;
case TQueryTableColumn::NUM_ROWS_FETCHED:
WriteBigIntSlot(record.num_rows_fetched, slot);
break;
case TQueryTableColumn::ROW_MATERIALIZATION_ROWS_PER_SEC:
WriteBigIntSlot(query.row_materialization_rate, slot);
break;
case TQueryTableColumn::ROW_MATERIALIZATION_TIME_MS:
WriteDecimalSlot(slot_desc->type(),
query.row_materialization_time / NANOS_TO_MILLIS, slot);
break;
case TQueryTableColumn::COMPRESSED_BYTES_SPILLED:
WriteBigIntSlot(query.compressed_bytes_spilled, slot);
break;
case TQueryTableColumn::EVENT_PLANNING_FINISHED:
WriteEvent(query, slot_desc, slot, QueryEvent::PLANNING_FINISHED);
break;
case TQueryTableColumn::EVENT_SUBMIT_FOR_ADMISSION:
WriteEvent(query, slot_desc, slot, QueryEvent::SUBMIT_FOR_ADMISSION);
break;
case TQueryTableColumn::EVENT_COMPLETED_ADMISSION:
WriteEvent(query, slot_desc, slot, QueryEvent::COMPLETED_ADMISSION);
break;
case TQueryTableColumn::EVENT_ALL_BACKENDS_STARTED:
WriteEvent(query, slot_desc, slot, QueryEvent::ALL_BACKENDS_STARTED);
break;
case TQueryTableColumn::EVENT_ROWS_AVAILABLE:
WriteEvent(query, slot_desc, slot, QueryEvent::ROWS_AVAILABLE);
break;
case TQueryTableColumn::EVENT_FIRST_ROW_FETCHED:
WriteEvent(query, slot_desc, slot, QueryEvent::FIRST_ROW_FETCHED);
break;
case TQueryTableColumn::EVENT_LAST_ROW_FETCHED:
WriteEvent(query, slot_desc, slot, QueryEvent::LAST_ROW_FETCHED);
break;
case TQueryTableColumn::EVENT_UNREGISTER_QUERY:
WriteEvent(query, slot_desc, slot, QueryEvent::UNREGISTER_QUERY);
break;
case TQueryTableColumn::READ_IO_WAIT_TOTAL_MS:
WriteDecimalSlot(slot_desc->type(),
query.read_io_wait_time_total / NANOS_TO_MILLIS, slot);
break;
case TQueryTableColumn::READ_IO_WAIT_MEAN_MS:
WriteDecimalSlot(slot_desc->type(),
query.read_io_wait_time_mean / NANOS_TO_MILLIS, slot);
break;
case TQueryTableColumn::BYTES_READ_CACHE_TOTAL:
WriteBigIntSlot(query.bytes_read_cache_total, slot);
break;
case TQueryTableColumn::BYTES_READ_TOTAL:
WriteBigIntSlot(query.bytes_read_total, slot);
break;
case TQueryTableColumn::PERNODE_PEAK_MEM_MIN:
if (auto min_elem = min_element(query.per_host_state.cbegin(),
query.per_host_state.cend(), PerHostPeakMemoryComparator);
LIKELY(min_elem != query.per_host_state.cend())) {
WriteBigIntSlot(min_elem->second.peak_memory_usage, slot);
}
break;
case TQueryTableColumn::PERNODE_PEAK_MEM_MAX:
if (auto max_elem = max_element(query.per_host_state.cbegin(),
query.per_host_state.cend(), PerHostPeakMemoryComparator);
LIKELY(max_elem != query.per_host_state.cend())) {
WriteBigIntSlot(max_elem->second.peak_memory_usage, slot);
}
break;
case TQueryTableColumn::PERNODE_PEAK_MEM_MEAN:
if (LIKELY(!query.per_host_state.empty())) {
int64_t calc_mean = 0;
for (const auto& host : query.per_host_state) {
calc_mean += host.second.peak_memory_usage;
}
calc_mean /= query.per_host_state.size();
WriteBigIntSlot(calc_mean, slot);
}
break;
case TQueryTableColumn::SQL:
RETURN_IF_ERROR(WriteStringSlot(query.redacted_sql, pool, slot));
break;
case TQueryTableColumn::PLAN:
RETURN_IF_ERROR(WriteStringSlot(
trim_left_copy_if(record.plan, is_any_of("\n")), pool, slot));
break;
case TQueryTableColumn::TABLES_QUERIED:
if (!query.tables.empty()) {
RETURN_IF_ERROR(WriteStringSlot(PrintTableList(query.tables), pool, slot));
}
break;
case TQueryTableColumn::SELECT_COLUMNS:
if (!query.select_columns.empty()
&& LIKELY(IncludeField(TQueryTableColumn::type::SELECT_COLUMNS))) {
RETURN_IF_ERROR(WriteStringSlot(join(query.select_columns, ","), pool, slot));
}
break;
case TQueryTableColumn::WHERE_COLUMNS:
if (!query.where_columns.empty()
&& LIKELY(IncludeField(TQueryTableColumn::type::WHERE_COLUMNS))) {
RETURN_IF_ERROR(WriteStringSlot(join(query.where_columns, ","), pool, slot));
}
break;
case TQueryTableColumn::JOIN_COLUMNS:
if (!query.join_columns.empty()
&& LIKELY(IncludeField(TQueryTableColumn::type::JOIN_COLUMNS))) {
RETURN_IF_ERROR(WriteStringSlot(join(query.join_columns, ","), pool, slot));
}
break;
case TQueryTableColumn::AGGREGATE_COLUMNS:
if (!query.aggregate_columns.empty()
&& LIKELY(IncludeField(TQueryTableColumn::type::AGGREGATE_COLUMNS))) {
RETURN_IF_ERROR(
WriteStringSlot(join(query.aggregate_columns, ","), pool, slot));
}
break;
case TQueryTableColumn::ORDERBY_COLUMNS:
if (!query.orderby_columns.empty()
&& LIKELY(IncludeField(TQueryTableColumn::type::ORDERBY_COLUMNS))) {
RETURN_IF_ERROR(WriteStringSlot(join(query.orderby_columns, ","), pool, slot));
}
break;
case TQueryTableColumn::COORDINATOR_SLOTS:
if (LIKELY(IncludeField(TQueryTableColumn::type::COORDINATOR_SLOTS))) {
WriteBigIntSlot(record.coordinator_slots, slot);
}
break;
case TQueryTableColumn::EXECUTOR_SLOTS:
if (LIKELY(IncludeField(TQueryTableColumn::type::EXECUTOR_SLOTS))) {
WriteBigIntSlot(record.executor_slots, slot);
}
break;
default:
LOG(WARNING) << "Unknown column (position " << slot_desc->col_pos() << ") added"
" to table " << table_name_ << "; check if a coordinator was upgraded";
tuple->SetNull(slot_desc->null_indicator_offset());
break;
}
}
query_records_.pop_front();
if (query_records_.empty()) eos_ = true;
return Status::OK();
}