in src/toolkits/recsys/recsys_model_base.cpp [58:337]
void recsys_model_base::setup_and_train(
const sframe& data,
const sframe& user_side_data,
const sframe& item_side_data,
const std::map<std::string, variant_type>& other_data) {
DASSERT_TRUE(data.is_opened_for_read());
turi::timer t;
t.start();
// set up the metadata
std::string user_column = get_option_value("user_id");
std::string item_column = get_option_value("item_id");
bool is_als = false;
if (state.count("solver") > 0){
std::string solver = variant_get_value<std::string>(state.at("solver"));
is_als = ( (solver == "ials") || (solver == "als"));
if (is_als && (get_option_value("num_factors") == 0)) {
log_and_throw("For solver='" + solver + "', num_factors must be > 0.");
}
}
if (user_column == item_column)
log_and_throw("User column and item column must be different.");
size_t orig_user_column_index = data.column_index(user_column);
size_t orig_item_column_index = data.column_index(item_column);
std::vector<std::string> column_ordering = {user_column, item_column};
std::vector<std::string> included_columns = column_ordering;
std::string target_column = get_option_value("target");
bool target_is_present = (target_column != "");
if(target_is_present && !data.contains_column(target_column)) {
log_and_throw(std::string("Target column given as '")
+ target_column + "', but this is not present in the provided data.");
}
bool use_target = use_target_column(target_is_present);
size_t orig_target_column_index = size_t(-1);
if (use_target) {
if (target_column == user_column || target_column == item_column)
log_and_throw("Target column must be different than the user column and the item column.");
if(target_column == "")
log_and_throw(std::string("Method ")
+ name() + " requires a numeric target column of scores or ratings; please specify this column using target_column = <name>.");
if (!data.contains_column(target_column))
log_and_throw(std::string("Method ")
+ name() + " requires a numeric target column of scores or ratings; The provided target column " + target_column + " not found.");
orig_target_column_index = data.column_index(target_column);
included_columns.push_back(target_column);
} else {
target_column = "";
}
// See if there are additional columns present:
std::vector<size_t> additional_columns;
for(size_t i = 0; i < data.num_columns(); ++i) {
if(i != orig_user_column_index
&& i != orig_item_column_index
&& i != orig_target_column_index) {
additional_columns.push_back(i);
}
}
if(!additional_columns.empty()) {
if(include_columns_beyond_user_item()) {
for(size_t c_idx : additional_columns)
included_columns.push_back(data.column_name(c_idx));
} else {
if(additional_columns.size() == 1
&& !use_target
&& (data.column_type(additional_columns[0]) == flex_type_enum::FLOAT
|| data.column_type(additional_columns[0]) == flex_type_enum::INTEGER)) {
logprogress_stream << "Warning: " << "Column '"
<< data.column_name(additional_columns[0])
<< "' ignored." << std::endl;
logprogress_stream << " To use one of these as a rating column, specify the column name to be used as target "
<< data.column_name(additional_columns[0])
<< "\" and use a method that allows the use of a target."
<< std::endl;
} else {
std::ostringstream columns_ss;
for(size_t j = 0; j < additional_columns.size() - 1; ++j)
columns_ss << data.column_name(additional_columns[j]) << ", ";
columns_ss << data.column_name(additional_columns.back());
if(!use_target) {
logprogress_stream
<< "Warning: Ignoring columns " << columns_ss.str() << ";"
<< std::endl;
logprogress_stream
<< " To use one of these as a rating column, specify the column name to be used as target "
<< std::endl;
logprogress_stream
<< " and use a method that allows the use of a target."
<< std::endl;
} else {
logprogress_stream
<< "Warning: Ignoring columns " << columns_ss.str() << ";"
<< std::endl;
logprogress_stream
<< " To use these columns in scoring predictions, use a model that allows the use of additional features."
<< std::endl;
}
}
}
}
// Construct the first ml_data instance
v2::ml_data train_ml( {
{"sort_by_first_two_columns_on_train", true},
{"uniquify_side_column_names", true},
{"target_column_always_numeric", true},
{"ignore_new_columns_after_train", true}});
// Add in the primary data
train_ml.set_data(data.select_columns(included_columns),
target_column,
// forced column ordering
{user_column, item_column},
// Mode overrides -- make sure these are treated this way.
{ {user_column, v2::ml_column_mode::CATEGORICAL},
{item_column, v2::ml_column_mode::CATEGORICAL} } );
if(user_side_data.num_columns() != 0 && is_als == false) {
train_ml.add_side_data(user_side_data, user_column);
}
if(item_side_data.num_columns() != 0 && is_als == false) {
train_ml.add_side_data(item_side_data, item_column);
}
if((item_side_data.num_columns() + user_side_data.num_columns() > 0)
&& (is_als == true)) {
logprogress_stream << "Warning: "
<< "This solver currently does not support side features. "
<< "Proceeding with training without side features."
<< std::endl;
}
logprogress_stream << "Preparing data set." << std::endl;
train_ml.fill();
metadata = train_ml.metadata();
////////////////////////////////////////////////////////////
// Set other data. Note -- this sometimes changes the indexing,
// hence the code later on.
set_extra_data(other_data);
// Now, we are genuinely done with the setup step.
metadata->set_training_index_sizes_to_current_column_sizes();
////////////////////////////////////////////////////////////
trained_user_items = make_user_item_lists(train_ml);
logprogress_stream << " Data has " << train_ml.size() << " observations with "
<< metadata->column_size(USER_COLUMN_INDEX)
<< " users and "
<< metadata->column_size(ITEM_COLUMN_INDEX)
<< " items." << std::endl;
double data_load_time = t.current_time();
logprogress_stream << " Data prepared in: " << data_load_time
<< "s" << std::endl;
state["data_load_time"] = to_variant(data_load_time);
// Train using ALS
if (is_als == true) {
// Construct the first ml_data instance
v2::ml_data train_ml_by_item( {
{"sort_by_first_two_columns_on_train", true},
{"uniquify_side_column_names", true},
{"target_column_always_numeric", true},
{"ignore_new_columns_after_train", true}});
train_ml_by_item.set_data(data.select_columns(included_columns),
target_column,
// forced column ordering
{item_column, user_column},
// Mode overrides -- make sure these are treated this way.
{ {item_column, v2::ml_column_mode::CATEGORICAL},
{user_column, v2::ml_column_mode::CATEGORICAL} } );
train_ml_by_item.fill();
t.start();
std::map<std::string, flexible_type> ret = train(train_ml, train_ml_by_item);
state.insert(ret.begin(), ret.end());
// Train the model
} else {
t.start();
std::map<std::string, flexible_type> ret = train(train_ml);
state.insert(ret.begin(), ret.end());
}
double training_time = t.current_time();
state["training_time"] = training_time;
// Save information about the dataset
state["num_observations"] = to_variant(train_ml.size());
state["num_users"] = to_variant(metadata->column_size(USER_COLUMN_INDEX));
state["num_items"] = to_variant(metadata->column_size(ITEM_COLUMN_INDEX));
state["num_features"] = to_variant(metadata->num_columns());
state["num_user_side_features"] = to_variant(user_side_data.num_columns());
state["num_item_side_features"] = to_variant(item_side_data.num_columns());
state["observation_data_column_names"] = to_variant(included_columns);
state["user_side_data_column_names"] = to_variant(user_side_data.column_names());
state["item_side_data_column_names"] = to_variant(item_side_data.column_names());
{
std::vector<flexible_type> user_type_names(user_side_data.num_columns());
for (size_t i = 0; i < user_side_data.num_columns(); ++i)
user_type_names[i] = flex_type_enum_to_name(user_side_data.column_type(i));
state["user_side_data_column_types"] = to_variant(user_type_names);
}
{
std::vector<flexible_type> item_type_names(item_side_data.num_columns());
for (size_t i = 0; i < item_side_data.num_columns(); ++i)
item_type_names[i] = flex_type_enum_to_name(item_side_data.column_type(i));
state["item_side_data_column_types"] = to_variant(item_type_names);
}
if (use_target && state.count("training_rmse") == 0) {
// Calculate the training rmse manually. given data is in a
// different order.
sframe predictions = predict(train_ml);
std::vector<double> total_se_accumulator(thread::cpu_count(), 0);
in_parallel([&](size_t thread_idx, size_t num_threads) {
total_se_accumulator[thread_idx] = 0;
auto ml_it = train_ml.get_iterator(thread_idx, num_threads);
parallel_sframe_iterator sf_it(predictions, thread_idx, num_threads);
for(; !ml_it.done(); ++ml_it, ++sf_it) {
double diff = sf_it.value(0) - ml_it.target_value();
total_se_accumulator[thread_idx] += (diff * diff);
}
});
state["training_rmse"] = std::sqrt(std::accumulate(
total_se_accumulator.begin(), total_se_accumulator.end(), double(0.0)) / train_ml.size());
}
return;
}