void recsys_model_base::setup_and_train()

in src/toolkits/recsys/recsys_model_base.cpp [58:337]


void recsys_model_base::setup_and_train(
    const sframe& data,
    const sframe& user_side_data,
    const sframe& item_side_data,
    const std::map<std::string, variant_type>& other_data) {

  DASSERT_TRUE(data.is_opened_for_read());

  turi::timer t;
  t.start();

  // set up the metadata
  std::string user_column = get_option_value("user_id");
  std::string item_column = get_option_value("item_id");

  bool is_als = false;
  if (state.count("solver") > 0){
  std::string solver = variant_get_value<std::string>(state.at("solver"));
    is_als = ( (solver == "ials") || (solver == "als"));
    if (is_als && (get_option_value("num_factors") == 0)) {
      log_and_throw("For solver='" + solver +  "', num_factors must be > 0.");
    }
  }

  if (user_column == item_column)
    log_and_throw("User column and item column must be different.");

  size_t orig_user_column_index = data.column_index(user_column);
  size_t orig_item_column_index = data.column_index(item_column);

  std::vector<std::string> column_ordering = {user_column, item_column};
  std::vector<std::string> included_columns = column_ordering;

  std::string target_column = get_option_value("target");
  bool target_is_present = (target_column != "");

  if(target_is_present && !data.contains_column(target_column)) {
    log_and_throw(std::string("Target column given as '")
                  + target_column + "', but this is not present in the provided data.");
  }

  bool use_target = use_target_column(target_is_present);

  size_t orig_target_column_index = size_t(-1);
  if (use_target) {

    if (target_column == user_column || target_column == item_column)
      log_and_throw("Target column must be different than the user column and the item column.");

    if(target_column == "")
      log_and_throw(std::string("Method ")
                    + name() + " requires a numeric target column of scores or ratings; please specify this column using target_column = <name>.");

    if (!data.contains_column(target_column))
      log_and_throw(std::string("Method ")
                    + name() + " requires a numeric target column of scores or ratings; The provided target column " + target_column + " not found.");

    orig_target_column_index = data.column_index(target_column);
    included_columns.push_back(target_column);
  } else {
    target_column = "";
  }

  // See if there are additional columns present:
  std::vector<size_t> additional_columns;
  for(size_t i = 0; i < data.num_columns(); ++i) {
    if(i != orig_user_column_index
       && i != orig_item_column_index
       && i != orig_target_column_index) {
      additional_columns.push_back(i);
    }
  }

  if(!additional_columns.empty()) {

    if(include_columns_beyond_user_item()) {

      for(size_t c_idx : additional_columns)
        included_columns.push_back(data.column_name(c_idx));

    } else {

      if(additional_columns.size() == 1
         && !use_target
         && (data.column_type(additional_columns[0]) == flex_type_enum::FLOAT
             || data.column_type(additional_columns[0]) == flex_type_enum::INTEGER)) {

        logprogress_stream << "Warning: " << "Column '"
                           << data.column_name(additional_columns[0])
                           << "' ignored." << std::endl;
        logprogress_stream << " To use one of these as a rating column, specify the column name to be used as target "
                           << data.column_name(additional_columns[0])
                           << "\" and use a method that allows the use of a target."
                           << std::endl;
      } else {

        std::ostringstream columns_ss;

        for(size_t j = 0; j < additional_columns.size() - 1; ++j)
          columns_ss << data.column_name(additional_columns[j]) << ", ";

        columns_ss << data.column_name(additional_columns.back());

        if(!use_target) {
          logprogress_stream
              << "Warning: Ignoring columns " << columns_ss.str() << ";"
              << std::endl;
          logprogress_stream
              << " To use one of these as a rating column, specify the column name to be used as target "
              << std::endl;
          logprogress_stream
              << "    and use a method that allows the use of a target."
              << std::endl;
        } else {
          logprogress_stream
              << "Warning: Ignoring columns " << columns_ss.str() << ";"
              << std::endl;
          logprogress_stream
              << "    To use these columns in scoring predictions, use a model that allows the use of additional features."
              << std::endl;
        }
      }
    }
  }

  // Construct the first ml_data instance
  v2::ml_data train_ml( {
      {"sort_by_first_two_columns_on_train", true},
      {"uniquify_side_column_names", true},
      {"target_column_always_numeric", true},
      {"ignore_new_columns_after_train", true}});

  // Add in the primary data
  train_ml.set_data(data.select_columns(included_columns),

                    target_column,

                    // forced column ordering
                    {user_column, item_column},

                    // Mode overrides -- make sure these are treated this way.
                    { {user_column, v2::ml_column_mode::CATEGORICAL},
                      {item_column, v2::ml_column_mode::CATEGORICAL} } );

  if(user_side_data.num_columns() != 0 && is_als == false) {
    train_ml.add_side_data(user_side_data, user_column);
  }

  if(item_side_data.num_columns() != 0 && is_als == false) {
    train_ml.add_side_data(item_side_data, item_column);
  }

  if((item_side_data.num_columns() + user_side_data.num_columns() > 0)
                                                        && (is_als == true)) {
    logprogress_stream << "Warning: "
        << "This solver currently does not support side features. "
        << "Proceeding with training without side features."
        << std::endl;
  }

  logprogress_stream << "Preparing data set." << std::endl;
  train_ml.fill();

  metadata = train_ml.metadata();

  ////////////////////////////////////////////////////////////

  // Set other data.  Note -- this sometimes changes the indexing,
  // hence the code later on.
  set_extra_data(other_data);

  // Now, we are genuinely done with the setup step.
  metadata->set_training_index_sizes_to_current_column_sizes();

  ////////////////////////////////////////////////////////////

  trained_user_items = make_user_item_lists(train_ml);

  logprogress_stream << "    Data has " << train_ml.size() << " observations with "
                     << metadata->column_size(USER_COLUMN_INDEX)
                     << " users and "
                     << metadata->column_size(ITEM_COLUMN_INDEX)
                     << " items." << std::endl;

  double data_load_time = t.current_time();
  logprogress_stream << "    Data prepared in: " << data_load_time
                     << "s" << std::endl;
  state["data_load_time"] = to_variant(data_load_time);


  // Train using ALS
  if (is_als == true) {

    // Construct the first ml_data instance
    v2::ml_data train_ml_by_item( {
        {"sort_by_first_two_columns_on_train", true},
        {"uniquify_side_column_names", true},
        {"target_column_always_numeric", true},
        {"ignore_new_columns_after_train", true}});

    train_ml_by_item.set_data(data.select_columns(included_columns),
                      target_column,
                      // forced column ordering
                      {item_column, user_column},
                      // Mode overrides -- make sure these are treated this way.
                      { {item_column, v2::ml_column_mode::CATEGORICAL},
                        {user_column, v2::ml_column_mode::CATEGORICAL} } );

    train_ml_by_item.fill();

    t.start();
    std::map<std::string, flexible_type> ret = train(train_ml, train_ml_by_item);
    state.insert(ret.begin(), ret.end());

  // Train the model
  } else {
    t.start();
    std::map<std::string, flexible_type> ret = train(train_ml);
    state.insert(ret.begin(), ret.end());
  }

  double training_time = t.current_time();
  state["training_time"] = training_time;

  // Save information about the dataset
  state["num_observations"]            = to_variant(train_ml.size());
  state["num_users"]                   = to_variant(metadata->column_size(USER_COLUMN_INDEX));
  state["num_items"]                   = to_variant(metadata->column_size(ITEM_COLUMN_INDEX));
  state["num_features"]                = to_variant(metadata->num_columns());
  state["num_user_side_features"]      = to_variant(user_side_data.num_columns());
  state["num_item_side_features"]      = to_variant(item_side_data.num_columns());
  state["observation_data_column_names"] = to_variant(included_columns);
  state["user_side_data_column_names"] = to_variant(user_side_data.column_names());
  state["item_side_data_column_names"] = to_variant(item_side_data.column_names());

  {
    std::vector<flexible_type> user_type_names(user_side_data.num_columns());

    for (size_t i = 0; i < user_side_data.num_columns(); ++i)
      user_type_names[i] = flex_type_enum_to_name(user_side_data.column_type(i));

    state["user_side_data_column_types"] = to_variant(user_type_names);
  }

  {
    std::vector<flexible_type> item_type_names(item_side_data.num_columns());

    for (size_t i = 0; i < item_side_data.num_columns(); ++i)
      item_type_names[i] = flex_type_enum_to_name(item_side_data.column_type(i));

    state["item_side_data_column_types"] = to_variant(item_type_names);
  }

  if (use_target && state.count("training_rmse") == 0) {

    // Calculate the training rmse manually.  given data is in a
    // different order.

    sframe predictions = predict(train_ml);
    std::vector<double> total_se_accumulator(thread::cpu_count(), 0);

    in_parallel([&](size_t thread_idx, size_t num_threads) {

        total_se_accumulator[thread_idx] = 0;

        auto ml_it = train_ml.get_iterator(thread_idx, num_threads);
        parallel_sframe_iterator sf_it(predictions, thread_idx, num_threads);

        for(; !ml_it.done(); ++ml_it, ++sf_it) {
          double diff = sf_it.value(0) - ml_it.target_value();
          total_se_accumulator[thread_idx] += (diff * diff);
        }
      });

    state["training_rmse"] = std::sqrt(std::accumulate(
        total_se_accumulator.begin(), total_se_accumulator.end(), double(0.0)) / train_ml.size());
  }

  return;
}