Status QueryScanner::MaterializeNextTuple()

in be/src/exec/system-table-scanner.cc [175:429]


Status QueryScanner::MaterializeNextTuple(
    MemPool* pool, Tuple* tuple, const TupleDescriptor* tuple_desc) {
  using impala::workloadmgmt::IncludeField;
  DCHECK(!query_records_.empty());
  const QueryStateExpanded& query = *query_records_.front();
  const QueryStateRecord& record = *query.base_state;
  ExecEnv* exec_env = ExecEnv::GetInstance();
  // Verify there are no clustering columns (partitions) to offset col_pos.
  DCHECK_EQ(0, tuple_desc->table_desc()->num_clustering_cols());
  for (const SlotDescriptor* slot_desc : tuple_desc->slots()) {
    void* slot = tuple->GetSlot(slot_desc->tuple_offset());

    switch (slot_desc->col_pos()) {
      case TQueryTableColumn::CLUSTER_ID:
        RETURN_IF_ERROR(WriteStringSlot(FLAGS_cluster_id, pool, slot));
        break;
      case TQueryTableColumn::QUERY_ID:
        RETURN_IF_ERROR(WriteStringSlot(PrintId(record.id), pool, slot));
        break;
      case TQueryTableColumn::SESSION_ID:
        RETURN_IF_ERROR(WriteStringSlot(PrintId(query.session_id), pool, slot));
        break;
      case TQueryTableColumn::SESSION_TYPE:
        RETURN_IF_ERROR(WriteStringSlot(to_string(query.session_type), pool, slot));
        break;
      case TQueryTableColumn::HIVESERVER2_PROTOCOL_VERSION:
        if (query.session_type == TSessionType::HIVESERVER2) {
          RETURN_IF_ERROR(WriteStringSlot(
              query.hiveserver2_protocol_version_formatted(), pool, slot));
        }
        break;
      case TQueryTableColumn::DB_USER:
        RETURN_IF_ERROR(WriteStringSlot(record.effective_user, pool, slot));
        break;
      case TQueryTableColumn::DB_USER_CONNECTION:
        RETURN_IF_ERROR(WriteStringSlot(query.db_user_connection, pool, slot));
        break;
      case TQueryTableColumn::DB_NAME:
        RETURN_IF_ERROR(WriteStringSlot(record.default_db, pool, slot));
        break;
      case TQueryTableColumn::IMPALA_COORDINATOR:
        RETURN_IF_ERROR(WriteStringSlot(
            TNetworkAddressToString(exec_env->configured_backend_address()), pool, slot));
        break;
      case TQueryTableColumn::QUERY_STATUS:
        RETURN_IF_ERROR(WriteStringSlot(record.query_status.ok() ?
            "OK" : record.query_status.msg().msg(), pool, slot));
        break;
      case TQueryTableColumn::QUERY_STATE:
        RETURN_IF_ERROR(WriteStringSlot(record.query_state, pool, slot));
        break;
      case TQueryTableColumn::IMPALA_QUERY_END_STATE:
        RETURN_IF_ERROR(WriteStringSlot(query.impala_query_end_state, pool, slot));
        break;
      case TQueryTableColumn::QUERY_TYPE:
        RETURN_IF_ERROR(WriteStringSlot(to_string(record.stmt_type), pool, slot));
        break;
      case TQueryTableColumn::NETWORK_ADDRESS:
        RETURN_IF_ERROR(WriteStringSlot(
            TNetworkAddressToString(query.client_address), pool, slot));
        break;
      case TQueryTableColumn::START_TIME_UTC:
        WriteUnixTimestampSlot(record.start_time_us, slot);
        break;
      case TQueryTableColumn::TOTAL_TIME_MS: {
        const int64_t end_time_us =
            record.end_time_us > 0 ? record.end_time_us : UnixMicros();
        double duration_us = (end_time_us - record.start_time_us) / MICROS_TO_MILLIS;
        WriteDecimalSlot(slot_desc->type(), duration_us, slot);
        break;
      }
      case TQueryTableColumn::QUERY_OPTS_CONFIG:
        RETURN_IF_ERROR(WriteStringSlot(
            DebugQueryOptions(query.query_options), pool, slot));
        break;
      case TQueryTableColumn::RESOURCE_POOL:
        RETURN_IF_ERROR(WriteStringSlot(record.resource_pool, pool, slot));
        break;
      case TQueryTableColumn::PER_HOST_MEM_ESTIMATE:
        WriteBigIntSlot(query.per_host_mem_estimate, slot);
        break;
      case TQueryTableColumn::DEDICATED_COORD_MEM_ESTIMATE:
        WriteBigIntSlot(query.dedicated_coord_mem_estimate, slot);
        break;
      case TQueryTableColumn::PER_HOST_FRAGMENT_INSTANCES:
        if (!query.per_host_state.empty()) {
          stringstream ss;
          for (const auto& state : query.per_host_state) {
            ss << TNetworkAddressToString(state.first) << "="
               << state.second.fragment_instance_count << ",";
          }
          string s = ss.str();
          s.pop_back();
          RETURN_IF_ERROR(WriteStringSlot(s, pool, slot));
        }
        break;
      case TQueryTableColumn::BACKENDS_COUNT:
        DCHECK_LE(query.per_host_state.size(), numeric_limits<int32_t>::max());
        WriteIntSlot(query.per_host_state.size(), slot);
        break;
      case TQueryTableColumn::ADMISSION_RESULT:
        RETURN_IF_ERROR(WriteStringSlot(query.admission_result, pool, slot));
        break;
      case TQueryTableColumn::CLUSTER_MEMORY_ADMITTED:
        WriteBigIntSlot(record.cluster_mem_est, slot);
        break;
      case TQueryTableColumn::EXECUTOR_GROUP:
        RETURN_IF_ERROR(WriteStringSlot(query.executor_group, pool, slot));
        break;
      case TQueryTableColumn::EXECUTOR_GROUPS:
        RETURN_IF_ERROR(WriteStringSlot(query.executor_groups, pool, slot));
        break;
      case TQueryTableColumn::EXEC_SUMMARY:
        RETURN_IF_ERROR(WriteStringSlot(query.exec_summary, pool, slot));
        break;
      case TQueryTableColumn::NUM_ROWS_FETCHED:
        WriteBigIntSlot(record.num_rows_fetched, slot);
        break;
      case TQueryTableColumn::ROW_MATERIALIZATION_ROWS_PER_SEC:
        WriteBigIntSlot(query.row_materialization_rate, slot);
        break;
      case TQueryTableColumn::ROW_MATERIALIZATION_TIME_MS:
        WriteDecimalSlot(slot_desc->type(),
            query.row_materialization_time / NANOS_TO_MILLIS, slot);
        break;
      case TQueryTableColumn::COMPRESSED_BYTES_SPILLED:
        WriteBigIntSlot(query.compressed_bytes_spilled, slot);
        break;
      case TQueryTableColumn::EVENT_PLANNING_FINISHED:
        WriteEvent(query, slot_desc, slot, QueryEvent::PLANNING_FINISHED);
        break;
      case TQueryTableColumn::EVENT_SUBMIT_FOR_ADMISSION:
        WriteEvent(query, slot_desc, slot, QueryEvent::SUBMIT_FOR_ADMISSION);
        break;
      case TQueryTableColumn::EVENT_COMPLETED_ADMISSION:
        WriteEvent(query, slot_desc, slot, QueryEvent::COMPLETED_ADMISSION);
        break;
      case TQueryTableColumn::EVENT_ALL_BACKENDS_STARTED:
        WriteEvent(query, slot_desc, slot, QueryEvent::ALL_BACKENDS_STARTED);
        break;
      case TQueryTableColumn::EVENT_ROWS_AVAILABLE:
        WriteEvent(query, slot_desc, slot, QueryEvent::ROWS_AVAILABLE);
        break;
      case TQueryTableColumn::EVENT_FIRST_ROW_FETCHED:
        WriteEvent(query, slot_desc, slot, QueryEvent::FIRST_ROW_FETCHED);
        break;
      case TQueryTableColumn::EVENT_LAST_ROW_FETCHED:
        WriteEvent(query, slot_desc, slot, QueryEvent::LAST_ROW_FETCHED);
        break;
      case TQueryTableColumn::EVENT_UNREGISTER_QUERY:
        WriteEvent(query, slot_desc, slot, QueryEvent::UNREGISTER_QUERY);
        break;
      case TQueryTableColumn::READ_IO_WAIT_TOTAL_MS:
        WriteDecimalSlot(slot_desc->type(),
            query.read_io_wait_time_total / NANOS_TO_MILLIS, slot);
        break;
      case TQueryTableColumn::READ_IO_WAIT_MEAN_MS:
        WriteDecimalSlot(slot_desc->type(),
            query.read_io_wait_time_mean / NANOS_TO_MILLIS, slot);
        break;
      case TQueryTableColumn::BYTES_READ_CACHE_TOTAL:
        WriteBigIntSlot(query.bytes_read_cache_total, slot);
        break;
      case TQueryTableColumn::BYTES_READ_TOTAL:
        WriteBigIntSlot(query.bytes_read_total, slot);
        break;
      case TQueryTableColumn::PERNODE_PEAK_MEM_MIN:
        if (auto min_elem = min_element(query.per_host_state.cbegin(),
                query.per_host_state.cend(), PerHostPeakMemoryComparator);
            LIKELY(min_elem != query.per_host_state.cend())) {
          WriteBigIntSlot(min_elem->second.peak_memory_usage, slot);
        }
        break;
      case TQueryTableColumn::PERNODE_PEAK_MEM_MAX:
        if (auto max_elem = max_element(query.per_host_state.cbegin(),
                query.per_host_state.cend(), PerHostPeakMemoryComparator);
            LIKELY(max_elem != query.per_host_state.cend())) {
          WriteBigIntSlot(max_elem->second.peak_memory_usage, slot);
        }
        break;
      case TQueryTableColumn::PERNODE_PEAK_MEM_MEAN:
        if (LIKELY(!query.per_host_state.empty())) {
          int64_t calc_mean = 0;
          for (const auto& host : query.per_host_state) {
            calc_mean += host.second.peak_memory_usage;
          }
          calc_mean /= query.per_host_state.size();
          WriteBigIntSlot(calc_mean, slot);
        }
        break;
      case TQueryTableColumn::SQL:
        RETURN_IF_ERROR(WriteStringSlot(query.redacted_sql, pool, slot));
        break;
      case TQueryTableColumn::PLAN:
        RETURN_IF_ERROR(WriteStringSlot(
            trim_left_copy_if(record.plan, is_any_of("\n")), pool, slot));
        break;
      case TQueryTableColumn::TABLES_QUERIED:
        if (!query.tables.empty()) {
          RETURN_IF_ERROR(WriteStringSlot(PrintTableList(query.tables), pool, slot));
        }
        break;
      case TQueryTableColumn::SELECT_COLUMNS:
        if (!query.select_columns.empty()
            && LIKELY(IncludeField(TQueryTableColumn::type::SELECT_COLUMNS))) {
          RETURN_IF_ERROR(WriteStringSlot(join(query.select_columns, ","), pool, slot));
        }
        break;
      case TQueryTableColumn::WHERE_COLUMNS:
        if (!query.where_columns.empty()
            && LIKELY(IncludeField(TQueryTableColumn::type::WHERE_COLUMNS))) {
          RETURN_IF_ERROR(WriteStringSlot(join(query.where_columns, ","), pool, slot));
        }
        break;
      case TQueryTableColumn::JOIN_COLUMNS:
        if (!query.join_columns.empty()
            && LIKELY(IncludeField(TQueryTableColumn::type::JOIN_COLUMNS))) {
          RETURN_IF_ERROR(WriteStringSlot(join(query.join_columns, ","), pool, slot));
        }
        break;
      case TQueryTableColumn::AGGREGATE_COLUMNS:
        if (!query.aggregate_columns.empty()
            && LIKELY(IncludeField(TQueryTableColumn::type::AGGREGATE_COLUMNS))) {
          RETURN_IF_ERROR(
              WriteStringSlot(join(query.aggregate_columns, ","), pool, slot));
        }
        break;
      case TQueryTableColumn::ORDERBY_COLUMNS:
        if (!query.orderby_columns.empty()
            && LIKELY(IncludeField(TQueryTableColumn::type::ORDERBY_COLUMNS))) {
          RETURN_IF_ERROR(WriteStringSlot(join(query.orderby_columns, ","), pool, slot));
        }
        break;
      case TQueryTableColumn::COORDINATOR_SLOTS:
        if (LIKELY(IncludeField(TQueryTableColumn::type::COORDINATOR_SLOTS))) {
          WriteBigIntSlot(record.coordinator_slots, slot);
        }
        break;
      case TQueryTableColumn::EXECUTOR_SLOTS:
        if (LIKELY(IncludeField(TQueryTableColumn::type::EXECUTOR_SLOTS))) {
          WriteBigIntSlot(record.executor_slots, slot);
        }
        break;
      default:
        LOG(WARNING) << "Unknown column (position " << slot_desc->col_pos() << ") added"
            " to table " << table_name_ << "; check if a coordinator was upgraded";
        tuple->SetNull(slot_desc->null_indicator_offset());
        break;
    }
  }

  query_records_.pop_front();
  if (query_records_.empty()) eos_ = true;
  return Status::OK();
}