void ml_data::_fill_data_blocks()

in src/ml/ml_data/ml_data.cpp [328:643]


void ml_data::_fill_data_blocks(const sframe& raw_data,
                                bool immutable_metadata,
                                bool track_statistics,
                                ml_missing_value_action mva,
                                const std::pair<size_t, size_t>& row_bounds,
                                const std::set<std::string>& sorted_columns) {

  // Check to make sure that we are
  ASSERT_TRUE(_metadata != nullptr);

  if(rm.metadata_vect.empty()) {
    data_blocks.reset(new sarray<row_data_block>);
    data_blocks->open_for_write(1);
    data_blocks->close();
    return;
  }

  size_t max_num_threads = thread::cpu_count();

  ////////////////////////////////////////////////////////////////////////////////
  // Step 2: Set up the target.

  std::shared_ptr<sarray<flexible_type> > target;

  if(rm.has_target) {

    target = raw_data.select_column(_metadata->target_column_name());

    check_type_consistent_with_mode(
        _metadata->target_column_name(), target->get_type(), _metadata->target_column_mode());
  }

  ////////////////////////////////////////////////////////////////////////////////
  // Step 3: Set up the row sizes

  const size_t num_rows = row_bounds.second - row_bounds.first;
  const size_t row_lb = row_bounds.first;
  const size_t row_ub = row_bounds.second;


  ////////////////////////////////////////////////////////////////////////////////
  // Step 3.1: Check for an empty sframe.  In this case, just clear
  // everything and exit.  (We need to handle this case explicitly
  // since the user is allowed to pass in sframe() to signal that
  // ml_data is going to be created as an empty data.

  if(num_rows == 0) {
    data_blocks.reset(new sarray<row_data_block>);
    data_blocks->open_for_write(1);
    data_blocks->close();

    _max_row_size = 0;

    return;
  }

  ////////////////////////////////////////////////////////////////////////////////
  // Step 4: Set up metadata and input iterators

  // Set up all the proper data input sources for iterating through the data.
  // The full metadata has the target as well.  This way we can treat it just
  // like a column.  The bookkeeping here is easier, and it gets unpacked on
  // ml_data_iterator end.

  std::vector<std::shared_ptr<sarray<flexible_type> > > input_data;

  for(const auto& m : rm.metadata_vect) {
    input_data.push_back(raw_data.select_column(m->name));
    check_type_consistent_with_mode(m->name, raw_data.column_type(m->name), m->mode);
  }

  ////////////////////////////////////////////////////////////////////////////////
  // Step 5: Initialize all of the indexing and statistics classes

  scoped_finally indexer_finalizer;
  scoped_finally statistics_finalizer;

  // Initialize the metadata and key parts of ml_data
  //
  // The problem is that we always must make sure that things
  for(const auto& m : rm.metadata_vect) {

    std::shared_ptr<column_indexer> indexer = m->indexer;

    indexer->initialize();
    indexer_finalizer.add([indexer](){indexer->finalize();});

    if(track_statistics) {
      std::shared_ptr<column_statistics> statistics = m->statistics;

      statistics->initialize();
      statistics_finalizer.add([statistics](){statistics->finalize();});
    }
  }

  // Track the maximum row size; simple front loading of things for
  // efficient allocation.
  std::vector<size_t> max_row_size_by_segment(max_num_threads, 0);

  ////////////////////////////////////////////////////////////////////////////////
  // Step 5: Open the readers. Slow and independent, so do it in
  // parallel.

  std::vector<std::shared_ptr<sarray<flexible_type>::reader_type> > column_readers(rm.total_num_columns);

  parallel_for(0, rm.total_num_columns, [&](size_t c_idx) {

      auto m = rm.metadata_vect[c_idx];

      if(!m->is_untranslated_column()) {

        ASSERT_MSG(input_data[c_idx]->is_opened_for_read(),
                   "Input data not properly set up for reading.");

        column_readers[c_idx] = input_data[c_idx]->get_reader();

        // If we don't have that many rows, deterministically insert the
        // values into the index in order.  This makes a number of
        // test cases much easier to write.
        if(mode_is_indexed(m->mode)
           && (sorted_columns.empty() || (sorted_columns.count(m->name) == 0))) {
          std::vector<flexible_type> vv;
          column_readers[c_idx]->read_rows(row_lb, std::min(row_lb + 10000, row_ub), vv);
          m->indexer->insert_values_into_index(vv);
        }
      }
    });

  // If it's a sorted column, then get all the possible
  // values, then insert them in sorted order.  Could be
  // optimized.
  if(!sorted_columns.empty()) {
    for(size_t c_idx = 0; c_idx < rm.total_num_columns; ++c_idx) {

      const auto& m = rm.metadata_vect[c_idx];

      if(sorted_columns.count(m->name)) {

        column_indexer _idxr(m->name, m->mode, m->original_column_type);

        {
          _idxr.initialize();
          scoped_finally idx_fin([&](){_idxr.finalize();});

          // Add everything to the index.
          in_parallel([&](size_t thread_idx, size_t num_threads) {

              size_t n_rows = row_ub - row_lb;
              size_t start_idx = row_lb + (thread_idx * n_rows) / num_threads;
              size_t end_idx = row_lb + ((thread_idx + 1) * n_rows) / num_threads;

              std::vector<flexible_type> vv;

              for(size_t row_idx = start_idx; row_idx < end_idx; row_idx += 4096) {

                column_readers[c_idx]->read_rows(row_idx, std::min(row_ub, row_idx + 4096), vv);

                for(const auto& v : vv) {
                  _idxr.map_value_to_index(thread_idx, v);
                }
              }
            });
        }

        // Pull out the values.
        std::vector<flexible_type> values = _idxr.reset_and_return_values();

        // Sort
        std::sort(values.begin(), values.end(),
                  [](const flexible_type& v1, const flexible_type& v2) {
                    if(UNLIKELY(v1.get_type() != v2.get_type())) {
                      return (int(v1.get_type()) < int(v2.get_type()));
                    } else if(UNLIKELY(v1.get_type() == flex_type_enum::UNDEFINED)) {
                      return false;
                    } else {
                      return v1 < v2;
                    }
                  });

        // Insert.
        m->indexer->insert_values_into_index(values);
      }
    }
  }


  ////////////////////////////////////////////////////////////////////////////////
  // Set the number of rows in each row block.

  row_block_size = estimate_row_block_size(num_rows, rm, column_readers);

  // Open the output writers.  In the case of shuffled output, create
  // more segments than we need, then write each block to a random
  // segment.  This fulfills a pretty decent type of shuffling.
  data_blocks.reset(new sarray<row_data_block>);

  size_t num_output_segments = max_num_threads;
  data_blocks->open_for_write(num_output_segments);

  std::vector<typename sarray<row_data_block>::iterator> output_iterators(num_output_segments);
  std::vector<mutex> output_iterator_locks(num_output_segments);

  for(size_t i = 0; i < output_iterators.size(); ++i)
    output_iterators[i] = data_blocks->get_output_iterator(i);

  // Run through the original data in parallel, mapping indices and tracking
  // everything as needed for the metadata statistics.
  in_parallel([&](size_t thread_idx, size_t num_threads) {

      // Set up start points for the segments.  To make the indexing sane, the
      // rows are stored in blocks of row_block_size rows. This must be true
      // across segments.  Thus we must set up the row indexing so that each
      // thread starts processing at one of the block boundaries.

      // Equal to flooring (i * num_rows / num_threads)
      // to the nearest multiple of row_block_size.
      size_t segment_row_index_start =
          row_block_size * size_t( ((thread_idx * num_rows) / num_threads)
                                   / row_block_size);

      size_t segment_row_index_end =
          ( (thread_idx == num_threads - 1)
            ? num_rows
            : row_block_size * size_t( (((thread_idx + 1) * num_rows)
                                        / num_threads) / row_block_size));

      // The data block into which we write everything
      row_data_block block_output;

      // Set up a buffered block of each of the columns
      std::vector<std::vector<flexible_type> > buffers(rm.total_num_columns);

      // The row index map; gets filled with every block.
      std::vector<size_t> row2data_idx_map;

      // Loop over blocks of rows, writing each one.
      size_t block_row_index_start = segment_row_index_start;
      DASSERT_EQ(block_row_index_start % row_block_size, 0);

      while(block_row_index_start != segment_row_index_end) {

        size_t block_row_index_end =
            std::min(segment_row_index_end,
                     block_row_index_start + row_block_size);

        size_t block_size = (block_row_index_end - block_row_index_start);

        if(block_size != row_block_size) {
          DASSERT_LT(block_size, row_block_size);
          DASSERT_EQ(segment_row_index_end, num_rows);
        }

        // Read all the rows into the buffer.  If we can't read in every one
        // from all the columns (????), then, we read in as much as we can and
        // then treat that amount as the block.
        for(size_t c_idx = 0; c_idx < rm.total_num_columns; ++c_idx) {

          // skip the untranslated_columns columns
          if(rm.metadata_vect[c_idx]->is_untranslated_column())
            continue;

#ifndef NDEBUG
          size_t n_rows_returned =
#endif
              column_readers[c_idx]->read_rows(row_lb + block_row_index_start,
                                               row_lb + block_row_index_end,
                                               buffers[c_idx]);

          DASSERT_EQ(n_rows_returned, block_size);
        }

        // Run the buffer
        size_t max_row_size =
            fill_row_buffer_from_column_buffer( row2data_idx_map,
                                                block_output,
                                                rm, buffers,
                                                thread_idx,
                                                track_statistics,
                                                immutable_metadata,
                                                mva);

        max_row_size_by_segment[thread_idx] = std::max(max_row_size,
                                                       max_row_size_by_segment[thread_idx]);


        // Write the output.
        auto& it_out = output_iterators[thread_idx];
        *it_out = block_output;
        ++it_out;

        // Increment
        block_row_index_start = block_row_index_end;

      } // End loop over buffered blocks

    }); // End parallel looping

  // Close the data out writers
  data_blocks->close();

  DASSERT_EQ(data_blocks->size(), ceil_divide(num_rows, row_block_size));

  // Finalize the lookups and statistics
  indexer_finalizer.execute_and_clear();

  if(track_statistics)
    statistics_finalizer.execute_and_clear();

  // Set overall max row size.  Subtract one if the target is present,
  // as the row size does not include that.
  _max_row_size = *std::max_element(max_row_size_by_segment.begin(),
                                    max_row_size_by_segment.end());

  if(rm.has_target)
    _max_row_size -= 1;
}