in src/ml/ml_data/ml_data.cpp [328:643]
void ml_data::_fill_data_blocks(const sframe& raw_data,
bool immutable_metadata,
bool track_statistics,
ml_missing_value_action mva,
const std::pair<size_t, size_t>& row_bounds,
const std::set<std::string>& sorted_columns) {
// Check to make sure that we are
ASSERT_TRUE(_metadata != nullptr);
if(rm.metadata_vect.empty()) {
data_blocks.reset(new sarray<row_data_block>);
data_blocks->open_for_write(1);
data_blocks->close();
return;
}
size_t max_num_threads = thread::cpu_count();
////////////////////////////////////////////////////////////////////////////////
// Step 2: Set up the target.
std::shared_ptr<sarray<flexible_type> > target;
if(rm.has_target) {
target = raw_data.select_column(_metadata->target_column_name());
check_type_consistent_with_mode(
_metadata->target_column_name(), target->get_type(), _metadata->target_column_mode());
}
////////////////////////////////////////////////////////////////////////////////
// Step 3: Set up the row sizes
const size_t num_rows = row_bounds.second - row_bounds.first;
const size_t row_lb = row_bounds.first;
const size_t row_ub = row_bounds.second;
////////////////////////////////////////////////////////////////////////////////
// Step 3.1: Check for an empty sframe. In this case, just clear
// everything and exit. (We need to handle this case explicitly
// since the user is allowed to pass in sframe() to signal that
// ml_data is going to be created as an empty data.
if(num_rows == 0) {
data_blocks.reset(new sarray<row_data_block>);
data_blocks->open_for_write(1);
data_blocks->close();
_max_row_size = 0;
return;
}
////////////////////////////////////////////////////////////////////////////////
// Step 4: Set up metadata and input iterators
// Set up all the proper data input sources for iterating through the data.
// The full metadata has the target as well. This way we can treat it just
// like a column. The bookkeeping here is easier, and it gets unpacked on
// ml_data_iterator end.
std::vector<std::shared_ptr<sarray<flexible_type> > > input_data;
for(const auto& m : rm.metadata_vect) {
input_data.push_back(raw_data.select_column(m->name));
check_type_consistent_with_mode(m->name, raw_data.column_type(m->name), m->mode);
}
////////////////////////////////////////////////////////////////////////////////
// Step 5: Initialize all of the indexing and statistics classes
scoped_finally indexer_finalizer;
scoped_finally statistics_finalizer;
// Initialize the metadata and key parts of ml_data
//
// The problem is that we always must make sure that things
for(const auto& m : rm.metadata_vect) {
std::shared_ptr<column_indexer> indexer = m->indexer;
indexer->initialize();
indexer_finalizer.add([indexer](){indexer->finalize();});
if(track_statistics) {
std::shared_ptr<column_statistics> statistics = m->statistics;
statistics->initialize();
statistics_finalizer.add([statistics](){statistics->finalize();});
}
}
// Track the maximum row size; simple front loading of things for
// efficient allocation.
std::vector<size_t> max_row_size_by_segment(max_num_threads, 0);
////////////////////////////////////////////////////////////////////////////////
// Step 5: Open the readers. Slow and independent, so do it in
// parallel.
std::vector<std::shared_ptr<sarray<flexible_type>::reader_type> > column_readers(rm.total_num_columns);
parallel_for(0, rm.total_num_columns, [&](size_t c_idx) {
auto m = rm.metadata_vect[c_idx];
if(!m->is_untranslated_column()) {
ASSERT_MSG(input_data[c_idx]->is_opened_for_read(),
"Input data not properly set up for reading.");
column_readers[c_idx] = input_data[c_idx]->get_reader();
// If we don't have that many rows, deterministically insert the
// values into the index in order. This makes a number of
// test cases much easier to write.
if(mode_is_indexed(m->mode)
&& (sorted_columns.empty() || (sorted_columns.count(m->name) == 0))) {
std::vector<flexible_type> vv;
column_readers[c_idx]->read_rows(row_lb, std::min(row_lb + 10000, row_ub), vv);
m->indexer->insert_values_into_index(vv);
}
}
});
// If it's a sorted column, then get all the possible
// values, then insert them in sorted order. Could be
// optimized.
if(!sorted_columns.empty()) {
for(size_t c_idx = 0; c_idx < rm.total_num_columns; ++c_idx) {
const auto& m = rm.metadata_vect[c_idx];
if(sorted_columns.count(m->name)) {
column_indexer _idxr(m->name, m->mode, m->original_column_type);
{
_idxr.initialize();
scoped_finally idx_fin([&](){_idxr.finalize();});
// Add everything to the index.
in_parallel([&](size_t thread_idx, size_t num_threads) {
size_t n_rows = row_ub - row_lb;
size_t start_idx = row_lb + (thread_idx * n_rows) / num_threads;
size_t end_idx = row_lb + ((thread_idx + 1) * n_rows) / num_threads;
std::vector<flexible_type> vv;
for(size_t row_idx = start_idx; row_idx < end_idx; row_idx += 4096) {
column_readers[c_idx]->read_rows(row_idx, std::min(row_ub, row_idx + 4096), vv);
for(const auto& v : vv) {
_idxr.map_value_to_index(thread_idx, v);
}
}
});
}
// Pull out the values.
std::vector<flexible_type> values = _idxr.reset_and_return_values();
// Sort
std::sort(values.begin(), values.end(),
[](const flexible_type& v1, const flexible_type& v2) {
if(UNLIKELY(v1.get_type() != v2.get_type())) {
return (int(v1.get_type()) < int(v2.get_type()));
} else if(UNLIKELY(v1.get_type() == flex_type_enum::UNDEFINED)) {
return false;
} else {
return v1 < v2;
}
});
// Insert.
m->indexer->insert_values_into_index(values);
}
}
}
////////////////////////////////////////////////////////////////////////////////
// Set the number of rows in each row block.
row_block_size = estimate_row_block_size(num_rows, rm, column_readers);
// Open the output writers. In the case of shuffled output, create
// more segments than we need, then write each block to a random
// segment. This fulfills a pretty decent type of shuffling.
data_blocks.reset(new sarray<row_data_block>);
size_t num_output_segments = max_num_threads;
data_blocks->open_for_write(num_output_segments);
std::vector<typename sarray<row_data_block>::iterator> output_iterators(num_output_segments);
std::vector<mutex> output_iterator_locks(num_output_segments);
for(size_t i = 0; i < output_iterators.size(); ++i)
output_iterators[i] = data_blocks->get_output_iterator(i);
// Run through the original data in parallel, mapping indices and tracking
// everything as needed for the metadata statistics.
in_parallel([&](size_t thread_idx, size_t num_threads) {
// Set up start points for the segments. To make the indexing sane, the
// rows are stored in blocks of row_block_size rows. This must be true
// across segments. Thus we must set up the row indexing so that each
// thread starts processing at one of the block boundaries.
// Equal to flooring (i * num_rows / num_threads)
// to the nearest multiple of row_block_size.
size_t segment_row_index_start =
row_block_size * size_t( ((thread_idx * num_rows) / num_threads)
/ row_block_size);
size_t segment_row_index_end =
( (thread_idx == num_threads - 1)
? num_rows
: row_block_size * size_t( (((thread_idx + 1) * num_rows)
/ num_threads) / row_block_size));
// The data block into which we write everything
row_data_block block_output;
// Set up a buffered block of each of the columns
std::vector<std::vector<flexible_type> > buffers(rm.total_num_columns);
// The row index map; gets filled with every block.
std::vector<size_t> row2data_idx_map;
// Loop over blocks of rows, writing each one.
size_t block_row_index_start = segment_row_index_start;
DASSERT_EQ(block_row_index_start % row_block_size, 0);
while(block_row_index_start != segment_row_index_end) {
size_t block_row_index_end =
std::min(segment_row_index_end,
block_row_index_start + row_block_size);
size_t block_size = (block_row_index_end - block_row_index_start);
if(block_size != row_block_size) {
DASSERT_LT(block_size, row_block_size);
DASSERT_EQ(segment_row_index_end, num_rows);
}
// Read all the rows into the buffer. If we can't read in every one
// from all the columns (????), then, we read in as much as we can and
// then treat that amount as the block.
for(size_t c_idx = 0; c_idx < rm.total_num_columns; ++c_idx) {
// skip the untranslated_columns columns
if(rm.metadata_vect[c_idx]->is_untranslated_column())
continue;
#ifndef NDEBUG
size_t n_rows_returned =
#endif
column_readers[c_idx]->read_rows(row_lb + block_row_index_start,
row_lb + block_row_index_end,
buffers[c_idx]);
DASSERT_EQ(n_rows_returned, block_size);
}
// Run the buffer
size_t max_row_size =
fill_row_buffer_from_column_buffer( row2data_idx_map,
block_output,
rm, buffers,
thread_idx,
track_statistics,
immutable_metadata,
mva);
max_row_size_by_segment[thread_idx] = std::max(max_row_size,
max_row_size_by_segment[thread_idx]);
// Write the output.
auto& it_out = output_iterators[thread_idx];
*it_out = block_output;
++it_out;
// Increment
block_row_index_start = block_row_index_end;
} // End loop over buffered blocks
}); // End parallel looping
// Close the data out writers
data_blocks->close();
DASSERT_EQ(data_blocks->size(), ceil_divide(num_rows, row_block_size));
// Finalize the lookups and statistics
indexer_finalizer.execute_and_clear();
if(track_statistics)
statistics_finalizer.execute_and_clear();
// Set overall max row size. Subtract one if the target is present,
// as the row size does not include that.
_max_row_size = *std::max_element(max_row_size_by_segment.begin(),
max_row_size_by_segment.end());
if(rm.has_target)
_max_row_size -= 1;
}