void DatasetLoader::ConstructBinMappersFromTextData()

in src/io/dataset_loader.cpp [990:1179]


void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines,
                                                    const std::vector<std::string>& sample_data,
                                                    const Parser* parser, Dataset* dataset) {
  auto t1 = std::chrono::high_resolution_clock::now();
  std::vector<std::vector<double>> sample_values;
  std::vector<std::vector<int>> sample_indices;
  std::vector<std::pair<int, double>> oneline_features;
  double label;
  for (int i = 0; i < static_cast<int>(sample_data.size()); ++i) {
    oneline_features.clear();
    // parse features
    parser->ParseOneLine(sample_data[i].c_str(), &oneline_features, &label);
    for (std::pair<int, double>& inner_data : oneline_features) {
      if (static_cast<size_t>(inner_data.first) >= sample_values.size()) {
        sample_values.resize(inner_data.first + 1);
        sample_indices.resize(inner_data.first + 1);
      }
      if (std::fabs(inner_data.second) > kZeroThreshold || std::isnan(inner_data.second)) {
        sample_values[inner_data.first].emplace_back(inner_data.second);
        sample_indices[inner_data.first].emplace_back(i);
      }
    }
  }

  dataset->feature_groups_.clear();
  dataset->num_total_features_ = std::max(static_cast<int>(sample_values.size()), parser->NumFeatures());
  if (num_machines > 1) {
    dataset->num_total_features_ = Network::GlobalSyncUpByMax(dataset->num_total_features_);
  }
  if (!feature_names_.empty()) {
    CHECK_EQ(dataset->num_total_features_, static_cast<int>(feature_names_.size()));
  }

  if (!config_.max_bin_by_feature.empty()) {
    CHECK_EQ(static_cast<size_t>(dataset->num_total_features_), config_.max_bin_by_feature.size());
    CHECK_GT(*(std::min_element(config_.max_bin_by_feature.begin(), config_.max_bin_by_feature.end())), 1);
  }

  // get forced split
  std::string forced_bins_path = config_.forcedbins_filename;
  std::vector<std::vector<double>> forced_bin_bounds = DatasetLoader::GetForcedBins(forced_bins_path,
                                                                                    dataset->num_total_features_,
                                                                                    categorical_features_);

  // check the range of label_idx, weight_idx and group_idx
  // skip label check if user input parser config file,
  // because label id is got from raw features while dataset features are consistent with customized parser.
  if (dataset->parser_config_str_.empty()) {
    CHECK(label_idx_ >= 0 && label_idx_ <= dataset->num_total_features_);
  }
  CHECK(weight_idx_ < 0 || weight_idx_ < dataset->num_total_features_);
  CHECK(group_idx_ < 0 || group_idx_ < dataset->num_total_features_);

  // fill feature_names_ if not header
  if (feature_names_.empty()) {
    for (int i = 0; i < dataset->num_total_features_; ++i) {
      std::stringstream str_buf;
      str_buf << "Column_" << i;
      feature_names_.push_back(str_buf.str());
    }
  }
  dataset->set_feature_names(feature_names_);
  std::vector<std::unique_ptr<BinMapper>> bin_mappers(dataset->num_total_features_);
  const data_size_t filter_cnt = static_cast<data_size_t>(
    static_cast<double>(config_.min_data_in_leaf* sample_data.size()) / dataset->num_data_);
  // start find bins
  if (num_machines == 1) {
    // if only one machine, find bin locally
    OMP_INIT_EX();
    #pragma omp parallel for schedule(guided)
    for (int i = 0; i < static_cast<int>(sample_values.size()); ++i) {
      OMP_LOOP_EX_BEGIN();
      if (ignore_features_.count(i) > 0) {
        bin_mappers[i] = nullptr;
        continue;
      }
      BinType bin_type = BinType::NumericalBin;
      if (categorical_features_.count(i)) {
        bin_type = BinType::CategoricalBin;
      }
      bin_mappers[i].reset(new BinMapper());
      if (config_.max_bin_by_feature.empty()) {
        bin_mappers[i]->FindBin(sample_values[i].data(), static_cast<int>(sample_values[i].size()),
                                sample_data.size(), config_.max_bin, config_.min_data_in_bin,
                                filter_cnt, config_.feature_pre_filter, bin_type, config_.use_missing, config_.zero_as_missing,
                                forced_bin_bounds[i]);
      } else {
        bin_mappers[i]->FindBin(sample_values[i].data(), static_cast<int>(sample_values[i].size()),
                                sample_data.size(), config_.max_bin_by_feature[i],
                                config_.min_data_in_bin, filter_cnt, config_.feature_pre_filter, bin_type, config_.use_missing,
                                config_.zero_as_missing, forced_bin_bounds[i]);
      }
      OMP_LOOP_EX_END();
    }
    OMP_THROW_EX();
  } else {
    // start and len will store the process feature indices for different machines
    // machine i will find bins for features in [ start[i], start[i] + len[i] )
    std::vector<int> start(num_machines);
    std::vector<int> len(num_machines);
    int step = (dataset->num_total_features_ + num_machines - 1) / num_machines;
    if (step < 1) { step = 1; }

    start[0] = 0;
    for (int i = 0; i < num_machines - 1; ++i) {
      len[i] = std::min(step, dataset->num_total_features_ - start[i]);
      start[i + 1] = start[i] + len[i];
    }
    len[num_machines - 1] = dataset->num_total_features_ - start[num_machines - 1];
    OMP_INIT_EX();
    #pragma omp parallel for schedule(guided)
    for (int i = 0; i < len[rank]; ++i) {
      OMP_LOOP_EX_BEGIN();
      if (ignore_features_.count(start[rank] + i) > 0) {
        continue;
      }
      BinType bin_type = BinType::NumericalBin;
      if (categorical_features_.count(start[rank] + i)) {
        bin_type = BinType::CategoricalBin;
      }
      bin_mappers[i].reset(new BinMapper());
      if (static_cast<int>(sample_values.size()) <= start[rank] + i) {
        continue;
      }
      if (config_.max_bin_by_feature.empty()) {
        bin_mappers[i]->FindBin(sample_values[start[rank] + i].data(),
                                static_cast<int>(sample_values[start[rank] + i].size()),
                                sample_data.size(), config_.max_bin, config_.min_data_in_bin,
                                filter_cnt, config_.feature_pre_filter, bin_type, config_.use_missing, config_.zero_as_missing,
                                forced_bin_bounds[i]);
      } else {
        bin_mappers[i]->FindBin(sample_values[start[rank] + i].data(),
                                static_cast<int>(sample_values[start[rank] + i].size()),
                                sample_data.size(), config_.max_bin_by_feature[i],
                                config_.min_data_in_bin, filter_cnt, config_.feature_pre_filter, bin_type,
                                config_.use_missing, config_.zero_as_missing, forced_bin_bounds[i]);
      }
      OMP_LOOP_EX_END();
    }
    OMP_THROW_EX();
    comm_size_t self_buf_size = 0;
    for (int i = 0; i < len[rank]; ++i) {
      if (ignore_features_.count(start[rank] + i) > 0) {
        continue;
      }
      self_buf_size += static_cast<comm_size_t>(bin_mappers[i]->SizesInByte());
    }
    std::vector<char> input_buffer(self_buf_size);
    auto cp_ptr = input_buffer.data();
    for (int i = 0; i < len[rank]; ++i) {
      if (ignore_features_.count(start[rank] + i) > 0) {
        continue;
      }
      bin_mappers[i]->CopyTo(cp_ptr);
      cp_ptr += bin_mappers[i]->SizesInByte();
      // free
      bin_mappers[i].reset(nullptr);
    }
    std::vector<comm_size_t> size_len = Network::GlobalArray(self_buf_size);
    std::vector<comm_size_t> size_start(num_machines, 0);
    for (int i = 1; i < num_machines; ++i) {
      size_start[i] = size_start[i - 1] + size_len[i - 1];
    }
    comm_size_t total_buffer_size = size_start[num_machines - 1] + size_len[num_machines - 1];
    std::vector<char> output_buffer(total_buffer_size);
    // gather global feature bin mappers
    Network::Allgather(input_buffer.data(), size_start.data(), size_len.data(), output_buffer.data(), total_buffer_size);
    cp_ptr = output_buffer.data();
    // restore features bins from buffer
    for (int i = 0; i < dataset->num_total_features_; ++i) {
      if (ignore_features_.count(i) > 0) {
        bin_mappers[i] = nullptr;
        continue;
      }
      bin_mappers[i].reset(new BinMapper());
      bin_mappers[i]->CopyFrom(cp_ptr);
      cp_ptr += bin_mappers[i]->SizesInByte();
    }
  }
  dataset->Construct(&bin_mappers, dataset->num_total_features_, forced_bin_bounds, Common::Vector2Ptr<int>(&sample_indices).data(),
                     Common::Vector2Ptr<double>(&sample_values).data(),
                     Common::VectorSize<int>(sample_indices).data(), static_cast<int>(sample_indices.size()), sample_data.size(), config_);
  if (dataset->has_raw()) {
    dataset->ResizeRaw(static_cast<int>(sample_data.size()));
  }

  auto t2 = std::chrono::high_resolution_clock::now();
  Log::Info("Construct bin mappers from text data time %.2f seconds",
            std::chrono::duration<double, std::milli>(t2 - t1) * 1e-3);
}