in src/toolkits/ml_data_2/ml_data_setup.cpp [180:625]
void ml_data::_fill_data_blocks(bool in_training_mode) {
// Check to make sure that we are
ASSERT_TRUE(incoming_data != nullptr);
ASSERT_TRUE(_metadata != nullptr);
if(rm.metadata_vect.empty()) {
data_blocks.reset(new sarray<row_data_block>);
data_blocks->open_for_write(1);
data_blocks->close();
return;
}
////////////////////////////////////////////////////////////////////////////////
// Step 1: Set up all the variables relevant to controlling the
// filling.
size_t max_num_threads = thread::cpu_count();
/////////////////////////////////////////////////////////////
// Step 1.1: Set up the other binary variables here
bool track_statistics = in_training_mode ? true : false;
bool immutable_metadata = (!in_training_mode) && incoming_data->immutable_metadata;
////////////////////////////////////////////////////////////
// Step 1.2: Set the missing value (flex_type_enum::UNDEFINED)
// action.
missing_value_action none_action = get_missing_value_action(_metadata->options, in_training_mode);
////////////////////////////////////////////////////////////
// Step 1.3: Set up the creation flags.
const bool shuffle_output_data = _metadata->options.at("shuffle_rows");
const bool sort_by_first_two_columns_always
= _metadata->options.at("sort_by_first_two_columns");
const bool sort_by_first_two_columns_on_train
= _metadata->options.at("sort_by_first_two_columns_on_train");
const bool sort_by_first_two_columns
= (sort_by_first_two_columns_always
|| (in_training_mode && sort_by_first_two_columns_on_train));
// Perform checks appropriate for the flags
if(sort_by_first_two_columns) {
ASSERT_MSG(_metadata->column_mode(0) == ml_column_mode::CATEGORICAL,
"Mode of first column must be categorical for sorted_output to apply.");
ASSERT_MSG(_metadata->column_mode(1)== ml_column_mode::CATEGORICAL,
"Mode of second column must be categorical for sorted_output to apply.");
}
////////////////////////////////////////////////////////////////////////////////
// Step 2: Set up the target.
std::shared_ptr<sarray<flexible_type> > target;
const sframe& raw_data = incoming_data->data;
if(rm.has_target) {
target = raw_data.select_column(_metadata->target_column_name());
check_type_consistent_with_mode(
_metadata->target_column_name(), target->get_type(), _metadata->target_column_mode());
}
////////////////////////////////////////////////////////////////////////////////
// Step 3: Set up the row sizes
_row_start = 0;
_row_end = raw_data.num_rows();
const size_t num_rows = _row_end - _row_start;
////////////////////////////////////////////////////////////////////////////////
// Step 3.1: Check for an empty sframe. In this case, just clear
// everything and exit. (We need to handle this case explicitly
// since the user is allowed to pass in sframe() to signal that
// ml_data is going to be created as an empty data.
if(num_rows == 0) {
data_blocks.reset(new sarray<row_data_block>);
data_blocks->open_for_write(1);
data_blocks->close();
_max_row_size = 0;
return;
}
////////////////////////////////////////////////////////////////////////////////
// Step 4: Set up metadata and input iterators
// Set up all the proper data input sources for iterating through the data.
// The full metadata has the target as well. This way we can treat it just
// like a column. The bookkeeping here is easier, and it gets unpacked on
// ml_data_iterator end.
std::vector<std::shared_ptr<sarray<flexible_type> > > input_data;
for(const auto& m : rm.metadata_vect) {
input_data.push_back(raw_data.select_column(m->name));
check_type_consistent_with_mode(m->name, raw_data.column_type(m->name), m->mode);
}
////////////////////////////////////////////////////////////////////////////////
// Step 5: Initialize all of the indexing and statistics classes
scoped_finally indexer_finalizer;
scoped_finally statistics_finalizer;
// Initialize the metadata and key parts of ml_data
//
// The problem is that we always must make sure that things
for(const auto& m : rm.metadata_vect) {
std::shared_ptr<column_indexer> indexer = m->indexer;
indexer->initialize();
indexer_finalizer.add([indexer](){indexer->finalize();});
if(track_statistics) {
std::shared_ptr<column_statistics> statistics = m->statistics;
statistics->initialize();
statistics_finalizer.add([statistics](){statistics->finalize();});
}
}
// Track the maximum row size; simple front loading of things for
// efficient allocation.
std::vector<size_t> max_row_size_by_segment(max_num_threads, 0);
////////////////////////////////////////////////////////////////////////////////
// Step 5: Open the readers. Slow and independent, so do it in
// parallel.
std::vector<std::shared_ptr<sarray<flexible_type>::reader_type> > column_readers(rm.total_num_columns);
parallel_for(0, rm.total_num_columns, [&](size_t c_idx) {
if(!rm.metadata_vect[c_idx]->is_untranslated_column()) {
ASSERT_MSG(input_data[c_idx]->is_opened_for_read(),
"Input data not properly set up for reading.");
column_readers[c_idx] = input_data[c_idx]->get_reader();
// If we don't have that many rows, deterministically insert the
// values into the index in order. This makes a number of
// test cases much easier to write.
if(!shuffle_output_data && !immutable_metadata) {
std::vector<flexible_type> vv;
column_readers[c_idx]->read_rows(0, std::min<size_t>(10000, num_rows), vv);
rm.metadata_vect[c_idx]->indexer->insert_values_into_index(vv);
}
}
});
////////////////////////////////////////////////////////////////////////////////
// Step 6: Prepare the shuffling, if needed.
//
// If the output data needs to be shuffled, AND the output needs to
// be sorted, then the only way we do the shuffling is to index the
// first two columns in random order. As the sorting is done on the
// resulting indices, the net result will be that the original data
// is shuffled widely on the far end.
if(shuffle_output_data && sort_by_first_two_columns) {
const size_t shuffle_block_size = 32*1024;
// 1. Choose blocks of 32K indices; at each step, pick one of the
// 32K index blocks at random, then add those values in random
// order.
std::vector<size_t> blocks(ceil_divide(num_rows, shuffle_block_size));
for(size_t i = 0; i < blocks.size(); ++i)
blocks[i] = i;
random::shuffle(blocks.begin(), blocks.end());
for(size_t col_idx : {0, 1}) {
auto& col_reader = column_readers[col_idx];
in_parallel([&](size_t thread_idx, size_t num_threads) {
size_t block_idx_run_start = (thread_idx * blocks.size()) / num_threads;
size_t block_idx_run_end = ((thread_idx + 1) * blocks.size()) / num_threads;
std::vector<flexible_type> col_data;
for(size_t block_idx = block_idx_run_start; block_idx < block_idx_run_end; ++block_idx) {
size_t row_idx_start = blocks[block_idx] * shuffle_block_size;
size_t row_idx_end = std::min(num_rows, (blocks[block_idx] + 1) * shuffle_block_size);
col_reader->read_rows(row_idx_start, row_idx_end, col_data);
random::shuffle(col_data.begin(), col_data.end());
for(const flexible_type& f : col_data)
rm.metadata_vect[col_idx]->indexer->insert_values_into_index(f);
}
});
}
}
////////////////////////////////////////////////////////////////////////////////
// Set the number of rows in each row block.
row_block_size = estimate_row_block_size(
num_rows, rm, column_readers);
// Open the output writers. In the case of shuffled output, create
// more segments than we need, then write each block to a random
// segment. This fulfills a pretty decent type of shuffling.
data_blocks.reset(new sarray<row_data_block>);
size_t num_output_segments;
if(shuffle_output_data && !sort_by_first_two_columns) {
num_output_segments = std::max(size_t(13), 2*max_num_threads + 3);
} else {
num_output_segments = max_num_threads;
}
data_blocks->open_for_write(num_output_segments);
std::vector<typename sarray<row_data_block>::iterator> output_iterators(num_output_segments);
std::vector<mutex> output_iterator_locks(num_output_segments);
for(size_t i = 0; i < output_iterators.size(); ++i)
output_iterators[i] = data_blocks->get_output_iterator(i);
// If we are shuffling the data, the final_shuffled_block_saved_row
// needs to be held back and put at the end, as it isn't full
// length. This holds this value; it gets set.
row_data_block final_shuffled_block_saved_row;
// Run through the original data in parallel, mapping indices and tracking
// everything as needed for the metadata statistics.
in_parallel([&](size_t thread_idx, size_t num_threads) {
// Set up start points for the segments. To make the indexing sane, the
// rows are stored in blocks of row_block_size rows. This must be true
// across segments. Thus we must set up the row indexing so that each
// thread starts processing at one of the block boundaries.
// Equal to flooring (i * num_rows / num_threads)
// to the nearest multiple of row_block_size.
size_t segment_row_index_start =
row_block_size * size_t( ((thread_idx * num_rows) / num_threads)
/ row_block_size);
size_t segment_row_index_end =
( (thread_idx == num_threads - 1)
? num_rows
: row_block_size * size_t( (((thread_idx + 1) * num_rows)
/ num_threads) / row_block_size));
// The data block into which we write everything
row_data_block block_output;
// Set up a buffered block of each of the columns
std::vector<std::vector<flexible_type> > buffers(rm.total_num_columns);
// The row index map; gets filled with every block.
std::vector<size_t> row2data_idx_map;
// The index remapping; this allows the use of sorting and
// shuffling to reorder rows within a block.
std::vector<size_t> index_remapping;
std::vector<std::pair<size_t, size_t> > column_sorting_values;
// Loop over blocks of rows, writing each one.
size_t block_row_index_start = segment_row_index_start;
DASSERT_EQ(block_row_index_start % row_block_size, 0);
while(block_row_index_start != segment_row_index_end) {
size_t block_row_index_end =
std::min(segment_row_index_end,
block_row_index_start + row_block_size);
size_t block_size = (block_row_index_end - block_row_index_start);
if(block_size != row_block_size) {
DASSERT_LT(block_size, row_block_size);
DASSERT_EQ(segment_row_index_end, num_rows);
}
// Read all the rows into the buffer. If we can't read in every one
// from all the columns (????), then, we read in as much as we can and
// then treat that amount as the block.
for(size_t c_idx = 0; c_idx < rm.total_num_columns; ++c_idx) {
// skip the untranslated_columns columns
if(rm.metadata_vect[c_idx]->is_untranslated_column())
continue;
#ifndef NDEBUG
size_t n_rows_returned =
#endif
column_readers[c_idx]->read_rows(block_row_index_start,
block_row_index_end,
buffers[c_idx]);
DASSERT_EQ(n_rows_returned, block_size);
}
// Set up an index mapping as needed.
if(sort_by_first_two_columns) {
// index these two rows as needed to get the sorted values.
// This may slow things down slightly, but currently it
// isn't the bottleneck.
column_sorting_values.resize(block_size);
for(size_t r_idx = 0; r_idx < block_size; ++r_idx) {
size_t c1_idx, c2_idx;
if(!immutable_metadata) {
c1_idx = rm.metadata_vect[0]->indexer->map_value_to_index(thread_idx, buffers[0][r_idx]);
c2_idx = rm.metadata_vect[1]->indexer->map_value_to_index(thread_idx, buffers[1][r_idx]);
} else {
c1_idx = rm.metadata_vect[0]->indexer->immutable_map_value_to_index(buffers[0][r_idx]);
c2_idx = rm.metadata_vect[1]->indexer->immutable_map_value_to_index(buffers[1][r_idx]);
}
column_sorting_values[r_idx] = {c1_idx, c2_idx};
}
if(index_remapping.size() != block_size) {
index_remapping.resize(block_size);
for(size_t k = 0; k < block_size; ++k)
index_remapping[k] = k;
}
// Get the index_mapping for a sorted list.
std::sort(index_remapping.begin(), index_remapping.end(),
[&column_sorting_values](size_t i1, size_t i2) {
return column_sorting_values[i1] < column_sorting_values[i2];
});
} else if(shuffle_output_data) {
// Shuffle the index map.
if(index_remapping.size() != block_size) {
index_remapping.resize(block_size);
for(size_t k = 0; k < block_size; ++k)
index_remapping[k] = k;
}
random::shuffle(index_remapping.begin(), index_remapping.end());
}
// Run the buffer
size_t max_row_size =
fill_row_buffer_from_column_buffer( row2data_idx_map,
block_output,
rm, buffers,
thread_idx,
track_statistics,
immutable_metadata,
none_action,
index_remapping);
max_row_size_by_segment[thread_idx] = std::max(max_row_size,
max_row_size_by_segment[thread_idx]);
// Write the output block to one of the segments.
if(shuffle_output_data && !sort_by_first_two_columns) {
if(block_size == row_block_size) {
while(true) {
// If it's a full block, write it to a random location.
size_t write_out_segment = random::fast_uniform<size_t>(0, output_iterators.size()-1);
if(output_iterator_locks[write_out_segment].try_lock()) {
auto& it_out = output_iterators[write_out_segment];
*it_out = block_output;
++it_out;
output_iterator_locks[write_out_segment].unlock();
break;
}
}
} else {
// If it's not a full block, save it for later -- it needs
// to be written last, to the final block.
// Also, this will only happen in the final thread.
DASSERT_TRUE(final_shuffled_block_saved_row.entry_data.empty());
DASSERT_EQ(thread_idx, num_threads-1);
final_shuffled_block_saved_row = block_output;
}
} else {
auto& it_out = output_iterators[thread_idx];
*it_out = block_output;
++it_out;
}
// Increment
block_row_index_start = block_row_index_end;
} // End loop over buffered blocks
}); // End parallel looping
// Do we have a residual last block? If so write it out to the last
// segment.
if(shuffle_output_data && !final_shuffled_block_saved_row.entry_data.empty()) {
auto& it_out = output_iterators.back();
*it_out = final_shuffled_block_saved_row;
++it_out;
}
// Close the data out writers
data_blocks->close();
DASSERT_EQ(data_blocks->size(), ceil_divide(num_rows, row_block_size));
// Finalize the lookups and statistics
indexer_finalizer.execute_and_clear();
if(track_statistics)
statistics_finalizer.execute_and_clear();
// Set overall max row size. Subtract one if the target is present,
// as the row size does not include that.
_max_row_size = *std::max_element(max_row_size_by_segment.begin(),
max_row_size_by_segment.end());
if(rm.has_target)
_max_row_size -= 1;
}