size_t fill_row_buffer_from_column_buffer()

in src/ml/ml_data/data_storage/ml_data_row_format.cpp [385:815]


size_t fill_row_buffer_from_column_buffer(
    std::vector<size_t>& row2data_idx_map, row_data_block& block_output,
    const row_metadata& rm,
    const std::vector<std::vector<flexible_type> >& column_buffers,
    size_t thread_idx, bool track_statistics, bool immutable_metadata,
    ml_missing_value_action none_action) {
  if (track_statistics) {
    DASSERT_MSG(!immutable_metadata,
                "Dynamic metadata must be allowed if statistics are tracked.");
  }

#ifndef NDEBUG
  DASSERT_EQ(rm.total_num_columns, column_buffers.size());
  {
    size_t check_column_buffer_size = 0;

    for (size_t c_idx = 0; c_idx < rm.total_num_columns; ++c_idx) {
      if (!rm.metadata_vect[c_idx]->is_untranslated_column()) {
        check_column_buffer_size = column_buffers[c_idx].size();
        break;
      }
    }

    for (size_t c_idx = 0; c_idx < rm.total_num_columns; ++c_idx) {
      const auto& c = column_buffers[c_idx];
      if (rm.metadata_vect[c_idx]->is_untranslated_column())
        DASSERT_EQ(c.size(), 0);
      else
        DASSERT_EQ(c.size(), check_column_buffer_size);
    }
  }
#endif

  // How many rows in the block?
  size_t block_size = size_t(-1);
  for (size_t c_idx = 0; c_idx < column_buffers.size(); ++c_idx) {
    if (!rm.metadata_vect[c_idx]->is_untranslated_column()) {
      block_size = column_buffers[c_idx].size();
      break;
    }
  }

  /** If they are all untranslated columns, then we're done here... */
  if (block_size == size_t(-1)) return 0;

  DASSERT_TRUE(block_size != 0);

  // Set up the row2data_idx_map
  row2data_idx_map.resize(block_size);

  // Column_buffers for doing the metadata stats indexing stuff.
  std::vector<size_t> index_vect;
  std::vector<double> value_vect;
  std::vector<std::pair<size_t, double> > idx_value_vect;
  std::vector<size_t> exclusion_indices;

  size_t max_row_size = 0;

  block_output.entry_data.clear();

  if (rm.data_size_is_constant)
    block_output.entry_data.reserve(rm.constant_data_size * block_size);

  for (size_t row_buffer_index = 0; row_buffer_index < block_size;
       ++row_buffer_index) {
    ////////////////////////////////////////////////////////////////////////////////
    // Get ready to fill this by defining the row size

    size_t row_size = 0;

    // If the rows are not a constant size, then this writes out the
    // proper row size as the first element.
    size_t n_row_elements_write_index = size_t(-1);

    // record the index of the start of this row.
    row2data_idx_map[row_buffer_index] = block_output.entry_data.size();

    if (!rm.data_size_is_constant) {
      n_row_elements_write_index = block_output.entry_data.size();
      entry_value ev;
      block_output.entry_data.push_back(ev);
    }

    // Okay, now go through the columns and unpack the values.  Make
    // sure they are written correctly.
    for (size_t c_idx = 0; c_idx < rm.total_num_columns; ++c_idx) {
      const flexible_type& v = column_buffers[c_idx][row_buffer_index];

      const column_metadata_ptr& m = rm.metadata_vect[c_idx];
      const std::shared_ptr<column_indexer>& m_idx = m->indexer;
      const std::shared_ptr<column_statistics>& m_stats = m->statistics;

      /** Call this to write out an index.
       */
      auto write_index = [&](size_t index) GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
        ++row_size;
        entry_value ev;
        ev.index_value = index;
        block_output.entry_data.push_back(ev);
      };

      /** Call this to write out a value.
       */
      auto write_value = [&](double value) GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
        ++row_size;
        entry_value ev;
        ev.double_value = value;
        block_output.entry_data.push_back(ev);
      };

      /** Call this to write out a size.
       */
      auto write_size = [&](size_t size) GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
        entry_value ev;
        ev.index_value = size;
        block_output.entry_data.push_back(ev);
      };

      /** Call this to write out an index/double pair.
       */
      auto write_index_value_pair = [&](std::pair<size_t, double> p)
                                        GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
                                          ++row_size;
                                          entry_value ev;
                                          ev.index_value = p.first;
                                          block_output.entry_data.push_back(ev);
                                          ev.double_value = p.second;
                                          block_output.entry_data.push_back(ev);
                                        };

      /** Call this to throw a NA error.
       */
      auto bad_missing_value_encountered = [&]() GL_GCC_ONLY(GL_COLD_NOINLINE) {

        log_and_throw(std::string("Missing value (None) encountered in ") +
                      "column '" + m->name + "'. " +
                      "Use the SFrame's dropna function to drop rows with "
                      "'None' values in them.");
      };

      /** Use this to retrieve a missing numeric value.
       */
      auto get_missing_numeric_value =
          [&](size_t feature_index) -> double GL_GCC_ONLY(GL_HOT_NOINLINE) {
        switch (none_action) {
          case ml_missing_value_action::ERROR:
            bad_missing_value_encountered();
            return 0;
          case ml_missing_value_action::IMPUTE:
            return m_stats->mean(feature_index);
          case ml_missing_value_action::USE_NAN:
            return NAN;
          default:
            ASSERT_TRUE(false);
            return 0;
        }
      };

      /** Use this to check that missing dicts and missing
       *  categorical_vectors are handled correctly.
       */
      auto verify_missing_categoricals_okay = [&]() GL_GCC_ONLY(GL_HOT_INLINE) {
        switch (none_action) {
          case ml_missing_value_action::ERROR:
            bad_missing_value_encountered();
          default:
            return;
        }
      };

      ////////////////////////////////////////////////////////////////////////////////
      // Step 3: Go through and write out the data using the above functions

      switch (rm.metadata_vect[c_idx]->mode) {
        case ml_column_mode::NUMERIC: {
          double v_d;

          // Missing value entries for numeric columns.
          if (UNLIKELY(v.get_type() == flex_type_enum::UNDEFINED)) {
            v_d = get_missing_numeric_value(0);
          } else {
            v_d = v;

            /**  Update Statistics  **/
            if (track_statistics) {
              value_vect = {v_d};
              m_stats->update_numeric_statistics(thread_idx, value_vect);
            }
          }

          /**  Write out the data in the proper format. **/
          write_value(v_d);

          break;
        }

        case ml_column_mode::NUMERIC_VECTOR:
        case ml_column_mode::NUMERIC_ND_VECTOR: {
          if (UNLIKELY(v.get_type() == flex_type_enum::UNDEFINED)) {
            // Top level None: An entire entry is missing.
            size_t n_values = m->fixed_column_size();

            for (size_t k = 0; k < n_values; ++k) {
              write_value(get_missing_numeric_value(k));
            }
          } else if (v.get_type() == flex_type_enum::VECTOR) {
            const std::vector<double>& feature_vect = v.get<flex_vec>();

            m->check_fixed_column_size(v);

            /**  Write out the data in the proper format. **/
            size_t n_values = feature_vect.size();

            for (size_t k = 0; k < n_values; ++k) {
              write_value(feature_vect[k]);
            }

            /**  Update Statistics  **/
            if (track_statistics) {
              m_stats->update_numeric_statistics(thread_idx, feature_vect);
            }
          } else if (v.get_type() == flex_type_enum::ND_VECTOR) {
            const flex_nd_vec& _a = v.get<flex_nd_vec>();
            flex_nd_vec _a_canonical;  // Temporary to make sure the
                                       // references stay valid
            const flex_nd_vec& a =
                _a.is_canonical() ? _a : (_a_canonical = _a.canonicalize());

            size_t n_values = a.num_elem();

            for (size_t i = 0; i < n_values; ++i) {
              write_value(a[i]);
            }

            /**  Update Statistics  **/
            m->check_fixed_column_size(v);

            if (track_statistics) {
              m_stats->update_numeric_statistics(thread_idx, a.raw_elements());
            }
          } else {
            ASSERT_TRUE(false);
          }

          break;
        }

        case ml_column_mode::CATEGORICAL: {
          /**  Map out the value.  **/

          // Missing values are always treated as their own category.
          size_t index;

          if (!immutable_metadata) {
            index = m_idx->map_value_to_index(thread_idx, v);
          } else {
            index = m_idx->immutable_map_value_to_index(v);
          }

          /**  Write out the data in the proper format. **/
          write_index(index);

          /**  Update Statistics  **/
          if (track_statistics) {
            index_vect = {index};
            m_stats->update_categorical_statistics(thread_idx, index_vect);
          }

          break;
        }

        case ml_column_mode::CATEGORICAL_VECTOR: {
          // Top level categorical vector is missing.
          if (UNLIKELY(v.get_type() == flex_type_enum::UNDEFINED)) {
            verify_missing_categoricals_okay();
            write_size(0);

          } else {
            const flex_list& vv = v.get<flex_list>();
            size_t n_values = vv.size();

            index_vect.resize(n_values);

            for (size_t k = 0; k < n_values; ++k) {
              if (!immutable_metadata) {
                index_vect[k] = m_idx->map_value_to_index(thread_idx, vv[k]);
              } else {
                index_vect[k] = m_idx->immutable_map_value_to_index(vv[k]);
              }
            }

            // Now, we want to sort the dictionaries by index; this
            // permits easy filling of an Eigen sparse vector when
            // the data is loaded, as we can insert it by index
            // order.
            std::sort(index_vect.begin(), index_vect.end());

            /**  Write out the data in the proper format. **/
            write_size(n_values);
            for (size_t k = 0; k < n_values; ++k) write_index(index_vect[k]);

            /**  Update Statistics  **/
            if (track_statistics)
              m_stats->update_categorical_statistics(thread_idx, index_vect);
          }

          break;
        }

        case ml_column_mode::DICTIONARY: {
          DASSERT_TRUE(exclusion_indices.empty());

          // Top level dictionary is missing
          if (v.get_type() == flex_type_enum::UNDEFINED) {
            verify_missing_categoricals_okay();
            write_size(0);

          } else {
            const flex_dict& dv = v.get<flex_dict>();
            size_t n_values = dv.size();

            idx_value_vect.resize(n_values);

            for (size_t k = 0; k < n_values; ++k) {
              const std::pair<flexible_type, flexible_type>& kvp = dv[k];

              size_t index;
              if (!immutable_metadata) {
                index = m_idx->map_value_to_index(thread_idx, kvp.first);
              } else {
                index = m_idx->immutable_map_value_to_index(kvp.first);
              }

              double value;

              if (kvp.second.get_type() == flex_type_enum::INTEGER ||
                  kvp.second.get_type() == flex_type_enum::FLOAT) {
                value = kvp.second;

              } else if (kvp.second.get_type() == flex_type_enum::UNDEFINED) {
                value = get_missing_numeric_value(index);
                exclusion_indices.push_back(index);

              } else {
                auto throw_error = [&]() GL_GCC_ONLY(GL_COLD_NOINLINE) {
                  log_and_throw(std::string("Dictionary value for key '") +
                                std::string(kvp.first) + "' in column '" +
                                m->name + "' is not numeric.");
                };

                throw_error();
              }

              idx_value_vect[k] = {index, value};
            }

            // Now, we want to sort the dictionaries by index; this
            // permits easy filling of an Eigen sparse vector when
            // the data is loaded, as we can insert it by index
            // order.
            std::sort(idx_value_vect.begin(), idx_value_vect.end());

            /**  Write out the data in the proper format. **/
            write_size(n_values);
            for (size_t k = 0; k < n_values; ++k)
              write_index_value_pair(idx_value_vect[k]);

            /**  Update Statistics  **/
            if (track_statistics) {
              if (!exclusion_indices.empty()) {
                // Efficiently remove the values recorded above as missing.
                auto rm_missing_values = [&]() GL_GCC_ONLY(
                                             GL_HOT_NOINLINE_FLATTEN) {

                  // Fast track the common single-occurance case.
                  if (exclusion_indices.size() == 1) {
                    size_t rm_index = exclusion_indices[0];
                    auto it = std::remove_if(
                        idx_value_vect.begin(), idx_value_vect.end(),
                        [&](const std::pair<size_t, double>& p) {
                          return p.first == rm_index;
                        });
                    idx_value_vect.resize(it - idx_value_vect.begin());
                  } else {
                    std::sort(exclusion_indices.begin(),
                              exclusion_indices.end());
                    auto it = std::remove_if(
                        idx_value_vect.begin(), idx_value_vect.end(),
                        [&](const std::pair<size_t, double>& p) {

                          // Is it in the exclusion_indices?
                          auto fit = std::lower_bound(exclusion_indices.begin(),
                                                      exclusion_indices.end(),
                                                      p.first);

                          return (fit != exclusion_indices.end() &&
                                  *fit == p.first);
                        });
                    idx_value_vect.resize(it - idx_value_vect.begin());
                  }

                  exclusion_indices.clear();
                };
                rm_missing_values();
              }

              m_stats->update_dict_statistics(thread_idx, idx_value_vect);
            }
          }
          break;
        }
        case ml_column_mode::CATEGORICAL_SORTED:
        case ml_column_mode::UNTRANSLATED:
          break;

      }  // end switch over column mode
    }    // End loop over columns

    if (!rm.data_size_is_constant) {
      DASSERT_TRUE(n_row_elements_write_index != size_t(-1));
      block_output.entry_data[n_row_elements_write_index].index_value =
          block_output.entry_data.size() - n_row_elements_write_index;
    }

    // Check the maximimum row size.
    max_row_size = std::max(max_row_size, row_size);

  }  // End loop over rows in buffer

  return max_row_size;
}