in src/io/dataset_loader.cpp [990:1179]
void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines,
const std::vector<std::string>& sample_data,
const Parser* parser, Dataset* dataset) {
auto t1 = std::chrono::high_resolution_clock::now();
std::vector<std::vector<double>> sample_values;
std::vector<std::vector<int>> sample_indices;
std::vector<std::pair<int, double>> oneline_features;
double label;
for (int i = 0; i < static_cast<int>(sample_data.size()); ++i) {
oneline_features.clear();
// parse features
parser->ParseOneLine(sample_data[i].c_str(), &oneline_features, &label);
for (std::pair<int, double>& inner_data : oneline_features) {
if (static_cast<size_t>(inner_data.first) >= sample_values.size()) {
sample_values.resize(inner_data.first + 1);
sample_indices.resize(inner_data.first + 1);
}
if (std::fabs(inner_data.second) > kZeroThreshold || std::isnan(inner_data.second)) {
sample_values[inner_data.first].emplace_back(inner_data.second);
sample_indices[inner_data.first].emplace_back(i);
}
}
}
dataset->feature_groups_.clear();
dataset->num_total_features_ = std::max(static_cast<int>(sample_values.size()), parser->NumFeatures());
if (num_machines > 1) {
dataset->num_total_features_ = Network::GlobalSyncUpByMax(dataset->num_total_features_);
}
if (!feature_names_.empty()) {
CHECK_EQ(dataset->num_total_features_, static_cast<int>(feature_names_.size()));
}
if (!config_.max_bin_by_feature.empty()) {
CHECK_EQ(static_cast<size_t>(dataset->num_total_features_), config_.max_bin_by_feature.size());
CHECK_GT(*(std::min_element(config_.max_bin_by_feature.begin(), config_.max_bin_by_feature.end())), 1);
}
// get forced split
std::string forced_bins_path = config_.forcedbins_filename;
std::vector<std::vector<double>> forced_bin_bounds = DatasetLoader::GetForcedBins(forced_bins_path,
dataset->num_total_features_,
categorical_features_);
// check the range of label_idx, weight_idx and group_idx
// skip label check if user input parser config file,
// because label id is got from raw features while dataset features are consistent with customized parser.
if (dataset->parser_config_str_.empty()) {
CHECK(label_idx_ >= 0 && label_idx_ <= dataset->num_total_features_);
}
CHECK(weight_idx_ < 0 || weight_idx_ < dataset->num_total_features_);
CHECK(group_idx_ < 0 || group_idx_ < dataset->num_total_features_);
// fill feature_names_ if not header
if (feature_names_.empty()) {
for (int i = 0; i < dataset->num_total_features_; ++i) {
std::stringstream str_buf;
str_buf << "Column_" << i;
feature_names_.push_back(str_buf.str());
}
}
dataset->set_feature_names(feature_names_);
std::vector<std::unique_ptr<BinMapper>> bin_mappers(dataset->num_total_features_);
const data_size_t filter_cnt = static_cast<data_size_t>(
static_cast<double>(config_.min_data_in_leaf* sample_data.size()) / dataset->num_data_);
// start find bins
if (num_machines == 1) {
// if only one machine, find bin locally
OMP_INIT_EX();
#pragma omp parallel for schedule(guided)
for (int i = 0; i < static_cast<int>(sample_values.size()); ++i) {
OMP_LOOP_EX_BEGIN();
if (ignore_features_.count(i) > 0) {
bin_mappers[i] = nullptr;
continue;
}
BinType bin_type = BinType::NumericalBin;
if (categorical_features_.count(i)) {
bin_type = BinType::CategoricalBin;
}
bin_mappers[i].reset(new BinMapper());
if (config_.max_bin_by_feature.empty()) {
bin_mappers[i]->FindBin(sample_values[i].data(), static_cast<int>(sample_values[i].size()),
sample_data.size(), config_.max_bin, config_.min_data_in_bin,
filter_cnt, config_.feature_pre_filter, bin_type, config_.use_missing, config_.zero_as_missing,
forced_bin_bounds[i]);
} else {
bin_mappers[i]->FindBin(sample_values[i].data(), static_cast<int>(sample_values[i].size()),
sample_data.size(), config_.max_bin_by_feature[i],
config_.min_data_in_bin, filter_cnt, config_.feature_pre_filter, bin_type, config_.use_missing,
config_.zero_as_missing, forced_bin_bounds[i]);
}
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
} else {
// start and len will store the process feature indices for different machines
// machine i will find bins for features in [ start[i], start[i] + len[i] )
std::vector<int> start(num_machines);
std::vector<int> len(num_machines);
int step = (dataset->num_total_features_ + num_machines - 1) / num_machines;
if (step < 1) { step = 1; }
start[0] = 0;
for (int i = 0; i < num_machines - 1; ++i) {
len[i] = std::min(step, dataset->num_total_features_ - start[i]);
start[i + 1] = start[i] + len[i];
}
len[num_machines - 1] = dataset->num_total_features_ - start[num_machines - 1];
OMP_INIT_EX();
#pragma omp parallel for schedule(guided)
for (int i = 0; i < len[rank]; ++i) {
OMP_LOOP_EX_BEGIN();
if (ignore_features_.count(start[rank] + i) > 0) {
continue;
}
BinType bin_type = BinType::NumericalBin;
if (categorical_features_.count(start[rank] + i)) {
bin_type = BinType::CategoricalBin;
}
bin_mappers[i].reset(new BinMapper());
if (static_cast<int>(sample_values.size()) <= start[rank] + i) {
continue;
}
if (config_.max_bin_by_feature.empty()) {
bin_mappers[i]->FindBin(sample_values[start[rank] + i].data(),
static_cast<int>(sample_values[start[rank] + i].size()),
sample_data.size(), config_.max_bin, config_.min_data_in_bin,
filter_cnt, config_.feature_pre_filter, bin_type, config_.use_missing, config_.zero_as_missing,
forced_bin_bounds[i]);
} else {
bin_mappers[i]->FindBin(sample_values[start[rank] + i].data(),
static_cast<int>(sample_values[start[rank] + i].size()),
sample_data.size(), config_.max_bin_by_feature[i],
config_.min_data_in_bin, filter_cnt, config_.feature_pre_filter, bin_type,
config_.use_missing, config_.zero_as_missing, forced_bin_bounds[i]);
}
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
comm_size_t self_buf_size = 0;
for (int i = 0; i < len[rank]; ++i) {
if (ignore_features_.count(start[rank] + i) > 0) {
continue;
}
self_buf_size += static_cast<comm_size_t>(bin_mappers[i]->SizesInByte());
}
std::vector<char> input_buffer(self_buf_size);
auto cp_ptr = input_buffer.data();
for (int i = 0; i < len[rank]; ++i) {
if (ignore_features_.count(start[rank] + i) > 0) {
continue;
}
bin_mappers[i]->CopyTo(cp_ptr);
cp_ptr += bin_mappers[i]->SizesInByte();
// free
bin_mappers[i].reset(nullptr);
}
std::vector<comm_size_t> size_len = Network::GlobalArray(self_buf_size);
std::vector<comm_size_t> size_start(num_machines, 0);
for (int i = 1; i < num_machines; ++i) {
size_start[i] = size_start[i - 1] + size_len[i - 1];
}
comm_size_t total_buffer_size = size_start[num_machines - 1] + size_len[num_machines - 1];
std::vector<char> output_buffer(total_buffer_size);
// gather global feature bin mappers
Network::Allgather(input_buffer.data(), size_start.data(), size_len.data(), output_buffer.data(), total_buffer_size);
cp_ptr = output_buffer.data();
// restore features bins from buffer
for (int i = 0; i < dataset->num_total_features_; ++i) {
if (ignore_features_.count(i) > 0) {
bin_mappers[i] = nullptr;
continue;
}
bin_mappers[i].reset(new BinMapper());
bin_mappers[i]->CopyFrom(cp_ptr);
cp_ptr += bin_mappers[i]->SizesInByte();
}
}
dataset->Construct(&bin_mappers, dataset->num_total_features_, forced_bin_bounds, Common::Vector2Ptr<int>(&sample_indices).data(),
Common::Vector2Ptr<double>(&sample_values).data(),
Common::VectorSize<int>(sample_indices).data(), static_cast<int>(sample_indices.size()), sample_data.size(), config_);
if (dataset->has_raw()) {
dataset->ResizeRaw(static_cast<int>(sample_data.size()));
}
auto t2 = std::chrono::high_resolution_clock::now();
Log::Info("Construct bin mappers from text data time %.2f seconds",
std::chrono::duration<double, std::milli>(t2 - t1) * 1e-3);
}