size_t fill_row_buffer_from_column_buffer()

in src/toolkits/ml_data_2/data_storage/ml_data_row_format.cpp [145:544]


size_t fill_row_buffer_from_column_buffer(
    std::vector<size_t>& row2data_idx_map,
    row_data_block& block_output,
    const row_metadata& rm,
    const std::vector<std::vector<flexible_type> >& column_buffers,
    size_t thread_idx,
    bool track_statistics,
    bool immutable_metadata,
    missing_value_action none_action,
    const std::vector<size_t>& index_remapping) {

  if(track_statistics) {
    DASSERT_MSG(!immutable_metadata,
                "Dynamic metadata must be allowed if statistics are tracked.");
  }


#ifndef NDEBUG
  DASSERT_EQ(rm.total_num_columns, column_buffers.size());
  {
    size_t check_column_buffer_size = 0;

    for(size_t c_idx = 0; c_idx < rm.total_num_columns; ++c_idx) {
      if(!rm.metadata_vect[c_idx]->is_untranslated_column()) {
        check_column_buffer_size = column_buffers[c_idx].size();
        break;
      }
    }

    for(size_t c_idx = 0; c_idx < rm.total_num_columns; ++c_idx) {
      const auto& c  = column_buffers[c_idx];
      if(rm.metadata_vect[c_idx]->is_untranslated_column())
        DASSERT_EQ(c.size(), 0);
      else
        DASSERT_EQ(c.size(), check_column_buffer_size);
    }
  }
#endif


  // How many rows in the block?
  size_t block_size = size_t(-1);
  for(size_t c_idx = 0; c_idx < column_buffers.size(); ++c_idx) {
    if(!rm.metadata_vect[c_idx]->is_untranslated_column()) {
      block_size = column_buffers[c_idx].size();
      break;
    }
  }

  /** If they are all untranslated columns, then we're done here... */
  if(block_size == size_t(-1))
    return 0;

  DASSERT_TRUE(block_size != 0);

  // Set up the row2data_idx_map
  row2data_idx_map.resize(block_size);

  // Column_buffers for doing the metadata stats indexing stuff.
  std::vector<size_t> index_vect;
  std::vector<double> value_vect;
  std::vector<std::pair<size_t, double> > idx_value_vect;

  size_t max_row_size = 0;

  block_output.entry_data.clear();

  if(rm.data_size_is_constant)
    block_output.entry_data.reserve(rm.constant_data_size * block_size);

  for(size_t _row_buffer_index = 0; _row_buffer_index < block_size; ++_row_buffer_index) {

    ////////////////////////////////////////////////////////////////////////////////
    // Get ready to fill this by defining the row size

    size_t row_size = 0;

    // If the rows are not a constant size, then this writes out the
    // proper row size as the first element.
    size_t n_row_elements_write_index = size_t(-1);

    // record the index of the start of this row.
    row2data_idx_map[_row_buffer_index] = block_output.entry_data.size();

    if(!rm.data_size_is_constant) {
      n_row_elements_write_index = block_output.entry_data.size();
      entry_value ev;
      block_output.entry_data.push_back(ev);
    }

    // Possible remap the row.  This happens if we will be sorting
    // rows later on.
    size_t row_buffer_index = (index_remapping.empty()
                               ? _row_buffer_index
                               : index_remapping[_row_buffer_index]);

    // Okay, now go through the columns and unpack the values.  Make
    // sure they are written correctly.
    for(size_t c_idx = 0; c_idx < rm.total_num_columns; ++c_idx) {

      const flexible_type& v = column_buffers[c_idx][row_buffer_index];

      const column_metadata_ptr& m                      = rm.metadata_vect[c_idx];
      const std::shared_ptr<column_indexer>& m_idx      = m->indexer;
      const std::shared_ptr<column_statistics>& m_stats = m->statistics;

      /** Call this to write out an index.
       */
      auto write_index = [&](size_t index) GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
        ++row_size;
        entry_value ev;
        ev.index_value = index;
        block_output.entry_data.push_back(ev);
      };

      /** Call this to write out a value.
       */
      auto write_value = [&](double value) GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
        ++row_size;
        entry_value ev;
        ev.double_value = value;
        block_output.entry_data.push_back(ev);
      };


      /** Call this to write out a size.
       */
      auto write_size = [&](size_t size) GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
        entry_value ev;
        ev.index_value = size;
        block_output.entry_data.push_back(ev);
      };


      /** Call this to write out an index/double pair.
       */
      auto write_index_value_pair = [&](std::pair<size_t, double> p) GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
        ++row_size;
        entry_value ev;
        ev.index_value = p.first;
        block_output.entry_data.push_back(ev);
        ev.double_value = p.second;
        block_output.entry_data.push_back(ev);
      };


      /** Call this to throw a NA error.
       */
      auto bad_missing_value_encountered = [&]() GL_GCC_ONLY(GL_COLD_NOINLINE) {

        log_and_throw(
            std::string("Missing value (None) encountered in ")
            + "column '" + m->name + "'. "
            + "Use the SFrame's dropna function to drop rows with 'None' values in them.");
      };


      /** Use this to retrieve a missing numeric value.
       */
      auto get_missing_numeric_value = [&](size_t feature_index) -> double GL_GCC_ONLY(GL_HOT_NOINLINE) {
        switch(none_action){
          case missing_value_action::ERROR:
          bad_missing_value_encountered();
          return 0;
          case missing_value_action::IMPUTE:
          return m_stats->mean(feature_index);
          default:
          ASSERT_TRUE(false);
          return 0;
        }
      };

      /** Use this to check that missing dicts and missing
       *  categorical_vectors are handled correctly.
       */
      auto verify_missing_categoricals_okay = [&]() GL_GCC_ONLY(GL_HOT_INLINE) {
        switch(none_action){
          case missing_value_action::ERROR:
          bad_missing_value_encountered();
          default:
          return;
        }
      };

      ////////////////////////////////////////////////////////////////////////////////
      // Step 3: Go through and write out the data using the above functions

      switch(rm.metadata_vect[c_idx]->mode) {
        case ml_column_mode::NUMERIC:
          {
            double v_d;

            // Missing value entries for numeric columns.
            if(UNLIKELY(v.get_type() == flex_type_enum::UNDEFINED)) {
              v_d = get_missing_numeric_value(0);
            } else {
              v_d = v;
            }

            /**  Write out the data in the proper format. **/
            write_value(v_d);

            /**  Update Statistics  **/
            if(track_statistics) {
              value_vect = {v_d};
              m_stats->update_numeric_statistics(thread_idx, value_vect);
            }

            break;
          }

        case ml_column_mode::NUMERIC_VECTOR:
          {

            if (UNLIKELY(v.get_type() == flex_type_enum::UNDEFINED) ) {

              // Top level None: An entire entry is missing.
              size_t n_values = m->fixed_column_size();

              for(size_t k = 0; k < n_values; ++k)
                write_value(get_missing_numeric_value(k));

            } else {

              const std::vector<double>& feature_vect = v.get<flex_vec>();

              /**  Write out the data in the proper format. **/
              size_t n_values = feature_vect.size();

              for(size_t k = 0; k < n_values; ++k)
                write_value(feature_vect[k]);

              /**  Update Statistics  **/
              m->check_fixed_column_size(v);

              if(track_statistics)
                m_stats->update_numeric_statistics(thread_idx, feature_vect);
            }

            break;
          }

        case ml_column_mode::CATEGORICAL:
          {
            /**  Map out the value.  **/

            // Missing values are always treated as their own category.
            size_t index;

            if(!immutable_metadata) {
              index = m_idx->map_value_to_index(thread_idx, v);
            } else {
              index = m_idx->immutable_map_value_to_index(v);
            }

            /**  Write out the data in the proper format. **/
            write_index(index);

            /**  Update Statistics  **/
            if(track_statistics) {
              index_vect = {index};
              m_stats->update_categorical_statistics(thread_idx, index_vect);
            }

            break;
          }

        case ml_column_mode::CATEGORICAL_VECTOR:
          {

            // Top level categorical vector is missing.
            if(UNLIKELY(v.get_type() == flex_type_enum::UNDEFINED)) {

              verify_missing_categoricals_okay();
              write_size(0);

            } else {

              const flex_list& vv = v.get<flex_list>();
              size_t n_values = vv.size();

              index_vect.resize(n_values);

              for(size_t k = 0; k < n_values; ++k) {
                if(!immutable_metadata) {
                  index_vect[k] = m_idx->map_value_to_index(thread_idx, vv[k]);
                } else {
                  index_vect[k] = m_idx->immutable_map_value_to_index(vv[k]);
                }
              }

              // Now, we want to sort the dictionaries by index; this
              // permits easy filling of an Eigen sparse vector when
              // the data is loaded, as we can insert it by index
              // order.
              std::sort(index_vect.begin(), index_vect.end());

              /**  Write out the data in the proper format. **/
              write_size(n_values);
              for(size_t k = 0; k < n_values; ++k)
                write_index(index_vect[k]);

              /**  Update Statistics  **/
              if(track_statistics)
                m_stats->update_categorical_statistics(thread_idx, index_vect);
            }

            break;
          }

        case ml_column_mode::DICTIONARY:
          {

            // Top level dictionary is missing
            if (v.get_type() == flex_type_enum::UNDEFINED) {

              verify_missing_categoricals_okay();
              write_size(0);

            } else {

              const flex_dict& dv = v.get<flex_dict>();
              size_t n_values = dv.size();

              idx_value_vect.resize(n_values);

              for(size_t k = 0; k < n_values; ++k) {
                const std::pair<flexible_type, flexible_type>& kvp = dv[k];

                size_t index;
                if(!immutable_metadata) {
                  index = m_idx->map_value_to_index(thread_idx, kvp.first);
                } else {
                  index = m_idx->immutable_map_value_to_index(kvp.first);
                }

                double value;

                if(kvp.second.get_type() == flex_type_enum::INTEGER
                   || kvp.second.get_type() == flex_type_enum::FLOAT){

                  value = kvp.second;

                } else if  (kvp.second.get_type() == flex_type_enum::UNDEFINED){

                  value = get_missing_numeric_value(index);

                } else {

                  auto throw_error = [&]() GL_GCC_ONLY(GL_COLD_NOINLINE) {
                    log_and_throw(std::string("Dictionary value for key '")
                                  + std::string(kvp.first)
                                  + "' in column '"
                                  + m->name
                                  + "' is not numeric.");
                  };

                  throw_error();
                }

                idx_value_vect[k] = {index, value};
              }

              // Now, we want to sort the dictionaries by index; this
              // permits easy filling of an Eigen sparse vector when
              // the data is loaded, as we can insert it by index
              // order.
              std::sort(idx_value_vect.begin(), idx_value_vect.end());

              /**  Write out the data in the proper format. **/
              write_size(n_values);
              for(size_t k = 0; k < n_values; ++k)
                write_index_value_pair(idx_value_vect[k]);

              /**  Update Statistics  **/
              if(track_statistics)
                m_stats->update_dict_statistics(thread_idx, idx_value_vect);

              break;
            }

            case ml_column_mode::UNTRANSLATED:
              break;
          }
      } // end switch over column mode
    } // End loop over columns

    if(!rm.data_size_is_constant) {
      DASSERT_TRUE(n_row_elements_write_index != size_t(-1));
      block_output.entry_data[n_row_elements_write_index].index_value
          = block_output.entry_data.size() - n_row_elements_write_index;
    }

    // Check the maximimum row size.
    max_row_size = std::max(max_row_size, row_size);

  } // End loop over rows in buffer

  return max_row_size;
}