void ml_data_side_features::add_and_index_side_data()

in src/toolkits/ml_data_2/side_features.cpp [39:434]


void ml_data_side_features::add_and_index_side_data(
    sframe unindexed_side_sframe,
    const std::map<std::string, ml_column_mode>& mode_overrides,
    const std::map<std::string, flexible_type>& options,
    bool training_mode,
    bool immutable_metadata,
    const std::string& forced_join_column) {

  if(unindexed_side_sframe.num_columns() == 0
     || (!training_mode && unindexed_side_sframe.num_rows() == 0))
    return;

  ////////////////////////////////////////////////////////////////////////////////

  // First find the column we need to join on.
  size_t side_join_column_index = size_t(-1);
  size_t main_join_column_index = size_t(-1);
  std::string join_column_name;

  if(!forced_join_column.empty()) {

    auto it = main_column_name_lookup.find(forced_join_column);

    if(it == main_column_name_lookup.end()) {
      log_and_throw(std::string("Join of side information requested on column ")
                    + forced_join_column + ", but this column is not present in the main data.");
    }

    main_join_column_index = it->second;

    if(!unindexed_side_sframe.contains_column(forced_join_column)) {
      log_and_throw(std::string("Join of side information requested on column ")
                    + forced_join_column + ", but this column is not present in the side data.");
    }

    join_column_name = forced_join_column;
    side_join_column_index = unindexed_side_sframe.column_index(join_column_name);

  } else {
    for(size_t i = 0; i < unindexed_side_sframe.num_columns(); ++i) {
      const std::string& column_name = unindexed_side_sframe.column_name(i);

      auto it = main_column_name_lookup.find(column_name);

      if(it != main_column_name_lookup.end()) {
        if(side_join_column_index != size_t(-1)) {
          log_and_throw(std::string("Join of side information attempted on both ")
                        + join_column_name + " and "
                        + column_name + "; joining must currently be on a single column.");
        }
        side_join_column_index = i;
        main_join_column_index = it->second;
        join_column_name = column_name;
      }
    }
  }

  if(side_join_column_index == size_t(-1))
    log_and_throw(std::string("No column found to join on. Exactly one column name "
                              "much match a column name in the main data to determine the join."));

  if(unindexed_side_sframe.num_columns() == 1) {
    logprogress_stream << "WARNING: No additional columns provided in side information for feature "
                       << join_column_name << "; ignoring." << std::endl;
    return;
  }

  ////////////////////////////////////////////////////////////////////////////////
  // Now, do we need a new schema for the side information

  column_side_info& si = side_lookups[main_join_column_index];

  std::vector<std::string> unjoined_names;

  if(training_mode) {

    // First test if we need to uniquify any of the column names.
    if(options.at("uniquify_side_column_names")) {
      uniquify_side_column_names(unindexed_side_sframe, si.column_name_map, join_column_name);
    }

    // Build up the list of side column names
    for(size_t i = 0; i < unindexed_side_sframe.num_columns(); ++i) {
      const std::string& column_name = unindexed_side_sframe.column_name(i);
      if(column_name != join_column_name)
        unjoined_names.push_back(column_name);
    }

    // Now construct the metadata
    std::vector<column_metadata_ptr> metadata_vect;

    for(size_t i = 0; i < unindexed_side_sframe.num_columns(); ++i) {
      if(i != side_join_column_index) {
        column_metadata_ptr cm(new column_metadata);
        cm->setup(
            false,
            unindexed_side_sframe.column_name(i),
            unindexed_side_sframe.select_column(i),
            mode_overrides,
            options);

        if(cm->is_untranslated_column())
          ASSERT_MSG(false, "Untranslated columns are not allowed in the side information.");

        metadata_vect.push_back(cm);
      }
    }

    si.rm.setup(metadata_vect, false);

    DASSERT_EQ(current_column_index, _full_metadata.size());
    _full_metadata.insert(_full_metadata.end(), si.rm.metadata_vect.begin(), si.rm.metadata_vect.end());

    si.column_index_start = current_column_index;
    current_column_index += si.rm.metadata_vect.size();

    // Zero out the maximum row size
    si.max_row_size = 0;

  } else {

    // Remap the column names if applicable.
    if(!si.column_name_map.empty()) {
      for(size_t i = 0; i < unindexed_side_sframe.num_columns(); ++i) {
        auto it = si.column_name_map.find(unindexed_side_sframe.column_name(i));

        if(it != si.column_name_map.end()) {
          DASSERT_TRUE(it->first != join_column_name);
          DASSERT_TRUE(it->second != join_column_name);

          unindexed_side_sframe.set_column_name(i, it->second);
        }
      }
    }

    const std::vector<std::string>& raw_column_names = unindexed_side_sframe.column_names();
    std::set<std::string> raw_column_name_set(raw_column_names.begin(), raw_column_names.end());

    std::vector<std::string> missing_columns;

    raw_column_name_set.erase(join_column_name);

    if(si.rm.metadata_vect.empty()) {
      bool ignore = options.at("ignore_new_columns_after_train");

      std::ostringstream ss;

      ss << "Side data provided on column '" << join_column_name
         << "'; no side data was provided at setup";

      if(ignore) {
        ss << "; Discarding.";
        logstream(LOG_WARNING) << ss.str() << std::endl;
        return;
      } else {
        ss << "." << std::endl;
        log_and_throw(ss.str());
      }
    }

    for(size_t c_idx = 0; c_idx < si.rm.metadata_vect.size(); ++c_idx) {
      const std::string& column_name = si.rm.metadata_vect[c_idx]->name;

      auto it = raw_column_name_set.find(column_name);

      if(it == raw_column_name_set.end())
        missing_columns.push_back(column_name);
      else
        raw_column_name_set.erase(it);
    }

    if(!missing_columns.empty()) {
      std::ostringstream ss;

      ss << "Provided data joined on " << join_column_name << " missing required columns: ";

      for(size_t i = 0; i < missing_columns.size() - 1; ++i) {
        ss << missing_columns[i] << ", ";
      }
      ss << missing_columns.back() << ".";

      log_and_throw(ss.str());
    }

    if(!raw_column_name_set.empty()) {
      std::ostringstream ss;

      bool ignore = options.at("ignore_new_columns_after_train");

      if(ignore) {
        ss << "Discarding additional columns present in side data on column "
           << join_column_name << " that do not match schema: ";
      } else {
        ss << "Additional columns present in side data on column "
           << join_column_name << " that do not match schema: ";
      }

      {
        size_t i = 0;
        for(const auto& s : raw_column_name_set) {
          ss << s;
          ++i;
          if(i != raw_column_name_set.size())
            ss << ", ";
        }
      }

      ss << ".";

      if(ignore) {
        logstream(LOG_WARNING) << ss.str() << std::endl;
      } else {
        log_and_throw(ss.str());
      }

      for(const std::string& c : raw_column_name_set) {
        unindexed_side_sframe = unindexed_side_sframe.remove_column(
            unindexed_side_sframe.column_index(c));
      }
    }

    // Build up the list of side column names
    for(size_t i = 0; i < unindexed_side_sframe.num_columns(); ++i) {
      const std::string& column_name = unindexed_side_sframe.column_name(i);
      if(column_name != join_column_name)
        unjoined_names.push_back(column_name);
    }

    training_mode = false;
  }

  bool track_statistics = training_mode ? true : false;
  missing_value_action mva = get_missing_value_action(options, training_mode);

  ////////////////////////////////////////////////////////////////////////////////

  // Get the indexed versions of the join column.  The other we'll
  // compute on the fly.
  column_metadata_ptr join_column_metadata = main_metadata[main_join_column_index];

  sframe side_data_sf = unindexed_side_sframe.select_columns(unjoined_names);

  sframe join_column_sf = map_to_indexed_sframe(
      {join_column_metadata->indexer},
      unindexed_side_sframe.select_columns({join_column_name}),
      !immutable_metadata);

  ////////////////////////////////////////////////////////////////////////////////
  // Make sure that the the current data_lookup_map is large enough to
  // hold the current number of categories.  If there are new values in
  // the side data, the above operations may have changed them.

  DASSERT_LE(si.data_lookup_map.size(), main_metadata[main_join_column_index]->column_size());
  si.data_lookup_map.resize(main_metadata[main_join_column_index]->column_size(), nullptr);

  ////////////////////////////////////////////////////////////////////////////////

  // Provide a safe way to get a new block of data.  We now have each
  // thread creating its own block of data.
  mutex new_data_aquire_lock;

  auto get_new_data_block = [this, &new_data_aquire_lock]() {
    std::shared_ptr<row_data_block> new_data_block_ptr(new row_data_block);

    new_data_aquire_lock.lock();
    raw_row_storage.push_back(new_data_block_ptr);
    new_data_aquire_lock.unlock();

    return new_data_block_ptr;
  };

  parallel_sframe_iterator_initializer join_column_it_init({join_column_sf, side_data_sf});

  ////////////////////////////////////////////////////////////////////////////////
  // Set up statistics tracking, if needed.

  scoped_finally indexer_finalizer;
  scoped_finally statistics_finalizer;

  // Initialize the metadata and key parts of ml_data
  //
  // The problem is that we always must make sure that things
  for(const auto& m : si.rm.metadata_vect) {
    std::shared_ptr<column_indexer> indexer = m->indexer;

    indexer->initialize();
    indexer_finalizer.add([indexer](){indexer->finalize();});

    if(track_statistics) {
      std::shared_ptr<column_statistics> statistics = m->statistics;

      statistics->initialize();
      statistics_finalizer.add([statistics](){statistics->finalize();});
    }
  }

  ////////////////////////////////////////////////////////////////////////////////
  // Track the maximum row size; simple front loading of things for
  // efficient allocation.
  size_t max_num_threads = thread::cpu_count();
  std::vector<size_t> max_row_size_by_thread(max_num_threads, 0);

  const size_t num_columns = side_data_sf.num_columns();

  in_parallel([&](size_t thread_idx, size_t num_threads) {
      // size_t thread_idx = 0; size_t num_threads = 1;  {

      // To fill these blocks of data, which is what we are doing
      // here, we sequentially grab blocks of 10000 rows of the data,
      // then keep track of how much is needed for the expanded size
      // while filling the buffer with the raw flexible_type objects.
      // We then allocate a new data block and fill it with the data
      // in the row buffer.
      //
      // The format is given by the internal format described in
      // ml_data_row_format.hpp.

      const size_t row_buffer_size = 10000;
      std::vector<size_t> join_index_buffer(row_buffer_size);
      std::vector<std::vector<flexible_type> > column_buffers(num_columns);
      std::vector<size_t> row2data_idx_map;

      parallel_sframe_iterator it(join_column_it_init, thread_idx, num_threads);

      // The main loop over all the data
      while(!it.done()) {

        ////////////////////////////////////////////////////////////////////////////////
        // Move everything from the side data block to the buffer

        size_t rows_in_buffer = 0;

        for(std::vector<flexible_type>& column : column_buffers)
          column.resize(row_buffer_size);

        for(size_t row_buffer_index = 0;
            row_buffer_index < row_buffer_size && !it.done();
            ++row_buffer_index, ++it) {

          // Save the index of the lookup location. it.value(0) is the
          // first column in parallel_sframe_iterator, which here is
          // the joined data column.  With the indexing provided by
          // column_metadata, this is the index.
          join_index_buffer[row_buffer_index] = it.value(0);

          for(size_t c_idx = 0; c_idx < num_columns; ++c_idx) {
            column_buffers[c_idx][row_buffer_index] = it.move_value(1, c_idx);
          }

          ++rows_in_buffer;
        }

        for(std::vector<flexible_type>& column : column_buffers)
          column.resize(rows_in_buffer);

        ////////////////////////////////////////////////////////////////////////////////
        // Put everything from the column_buffers into a block of raw data.

        std::shared_ptr<row_data_block> new_data = get_new_data_block();

        size_t max_row_size =
            fill_row_buffer_from_column_buffer(
                row2data_idx_map,
                *new_data,
                si.rm,
                column_buffers,
                thread_idx,
                track_statistics,
                immutable_metadata,
                mva);

        max_row_size_by_thread[thread_idx] = std::max(max_row_size_by_thread[thread_idx], max_row_size);

        ////////////////////////////////////////////////////////////////////////////////
        // Now go through and record the start of each row in the
        // location in data_lookup_map given by the join_index.  Also,
        // record join index.

        for(size_t i = 0; i < rows_in_buffer; ++i) {
          size_t idx = join_index_buffer[i];
          if(idx < si.data_lookup_map.size())
            si.data_lookup_map[idx] = new_data->entry_data.data() + row2data_idx_map[i];
        }

      }
    });

  si.max_row_size =
      std::max(si.max_row_size,
               *std::max_element(max_row_size_by_thread.begin(), max_row_size_by_thread.end()));

  // Finalize the lookups and statistics
  indexer_finalizer.execute_and_clear();
  if(track_statistics)
    statistics_finalizer.execute_and_clear();
}