in src/ml/ml_data/data_storage/ml_data_row_format.cpp [385:815]
size_t fill_row_buffer_from_column_buffer(
std::vector<size_t>& row2data_idx_map, row_data_block& block_output,
const row_metadata& rm,
const std::vector<std::vector<flexible_type> >& column_buffers,
size_t thread_idx, bool track_statistics, bool immutable_metadata,
ml_missing_value_action none_action) {
if (track_statistics) {
DASSERT_MSG(!immutable_metadata,
"Dynamic metadata must be allowed if statistics are tracked.");
}
#ifndef NDEBUG
DASSERT_EQ(rm.total_num_columns, column_buffers.size());
{
size_t check_column_buffer_size = 0;
for (size_t c_idx = 0; c_idx < rm.total_num_columns; ++c_idx) {
if (!rm.metadata_vect[c_idx]->is_untranslated_column()) {
check_column_buffer_size = column_buffers[c_idx].size();
break;
}
}
for (size_t c_idx = 0; c_idx < rm.total_num_columns; ++c_idx) {
const auto& c = column_buffers[c_idx];
if (rm.metadata_vect[c_idx]->is_untranslated_column())
DASSERT_EQ(c.size(), 0);
else
DASSERT_EQ(c.size(), check_column_buffer_size);
}
}
#endif
// How many rows in the block?
size_t block_size = size_t(-1);
for (size_t c_idx = 0; c_idx < column_buffers.size(); ++c_idx) {
if (!rm.metadata_vect[c_idx]->is_untranslated_column()) {
block_size = column_buffers[c_idx].size();
break;
}
}
/** If they are all untranslated columns, then we're done here... */
if (block_size == size_t(-1)) return 0;
DASSERT_TRUE(block_size != 0);
// Set up the row2data_idx_map
row2data_idx_map.resize(block_size);
// Column_buffers for doing the metadata stats indexing stuff.
std::vector<size_t> index_vect;
std::vector<double> value_vect;
std::vector<std::pair<size_t, double> > idx_value_vect;
std::vector<size_t> exclusion_indices;
size_t max_row_size = 0;
block_output.entry_data.clear();
if (rm.data_size_is_constant)
block_output.entry_data.reserve(rm.constant_data_size * block_size);
for (size_t row_buffer_index = 0; row_buffer_index < block_size;
++row_buffer_index) {
////////////////////////////////////////////////////////////////////////////////
// Get ready to fill this by defining the row size
size_t row_size = 0;
// If the rows are not a constant size, then this writes out the
// proper row size as the first element.
size_t n_row_elements_write_index = size_t(-1);
// record the index of the start of this row.
row2data_idx_map[row_buffer_index] = block_output.entry_data.size();
if (!rm.data_size_is_constant) {
n_row_elements_write_index = block_output.entry_data.size();
entry_value ev;
block_output.entry_data.push_back(ev);
}
// Okay, now go through the columns and unpack the values. Make
// sure they are written correctly.
for (size_t c_idx = 0; c_idx < rm.total_num_columns; ++c_idx) {
const flexible_type& v = column_buffers[c_idx][row_buffer_index];
const column_metadata_ptr& m = rm.metadata_vect[c_idx];
const std::shared_ptr<column_indexer>& m_idx = m->indexer;
const std::shared_ptr<column_statistics>& m_stats = m->statistics;
/** Call this to write out an index.
*/
auto write_index = [&](size_t index) GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
++row_size;
entry_value ev;
ev.index_value = index;
block_output.entry_data.push_back(ev);
};
/** Call this to write out a value.
*/
auto write_value = [&](double value) GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
++row_size;
entry_value ev;
ev.double_value = value;
block_output.entry_data.push_back(ev);
};
/** Call this to write out a size.
*/
auto write_size = [&](size_t size) GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
entry_value ev;
ev.index_value = size;
block_output.entry_data.push_back(ev);
};
/** Call this to write out an index/double pair.
*/
auto write_index_value_pair = [&](std::pair<size_t, double> p)
GL_GCC_ONLY(GL_HOT_INLINE_FLATTEN) {
++row_size;
entry_value ev;
ev.index_value = p.first;
block_output.entry_data.push_back(ev);
ev.double_value = p.second;
block_output.entry_data.push_back(ev);
};
/** Call this to throw a NA error.
*/
auto bad_missing_value_encountered = [&]() GL_GCC_ONLY(GL_COLD_NOINLINE) {
log_and_throw(std::string("Missing value (None) encountered in ") +
"column '" + m->name + "'. " +
"Use the SFrame's dropna function to drop rows with "
"'None' values in them.");
};
/** Use this to retrieve a missing numeric value.
*/
auto get_missing_numeric_value =
[&](size_t feature_index) -> double GL_GCC_ONLY(GL_HOT_NOINLINE) {
switch (none_action) {
case ml_missing_value_action::ERROR:
bad_missing_value_encountered();
return 0;
case ml_missing_value_action::IMPUTE:
return m_stats->mean(feature_index);
case ml_missing_value_action::USE_NAN:
return NAN;
default:
ASSERT_TRUE(false);
return 0;
}
};
/** Use this to check that missing dicts and missing
* categorical_vectors are handled correctly.
*/
auto verify_missing_categoricals_okay = [&]() GL_GCC_ONLY(GL_HOT_INLINE) {
switch (none_action) {
case ml_missing_value_action::ERROR:
bad_missing_value_encountered();
default:
return;
}
};
////////////////////////////////////////////////////////////////////////////////
// Step 3: Go through and write out the data using the above functions
switch (rm.metadata_vect[c_idx]->mode) {
case ml_column_mode::NUMERIC: {
double v_d;
// Missing value entries for numeric columns.
if (UNLIKELY(v.get_type() == flex_type_enum::UNDEFINED)) {
v_d = get_missing_numeric_value(0);
} else {
v_d = v;
/** Update Statistics **/
if (track_statistics) {
value_vect = {v_d};
m_stats->update_numeric_statistics(thread_idx, value_vect);
}
}
/** Write out the data in the proper format. **/
write_value(v_d);
break;
}
case ml_column_mode::NUMERIC_VECTOR:
case ml_column_mode::NUMERIC_ND_VECTOR: {
if (UNLIKELY(v.get_type() == flex_type_enum::UNDEFINED)) {
// Top level None: An entire entry is missing.
size_t n_values = m->fixed_column_size();
for (size_t k = 0; k < n_values; ++k) {
write_value(get_missing_numeric_value(k));
}
} else if (v.get_type() == flex_type_enum::VECTOR) {
const std::vector<double>& feature_vect = v.get<flex_vec>();
m->check_fixed_column_size(v);
/** Write out the data in the proper format. **/
size_t n_values = feature_vect.size();
for (size_t k = 0; k < n_values; ++k) {
write_value(feature_vect[k]);
}
/** Update Statistics **/
if (track_statistics) {
m_stats->update_numeric_statistics(thread_idx, feature_vect);
}
} else if (v.get_type() == flex_type_enum::ND_VECTOR) {
const flex_nd_vec& _a = v.get<flex_nd_vec>();
flex_nd_vec _a_canonical; // Temporary to make sure the
// references stay valid
const flex_nd_vec& a =
_a.is_canonical() ? _a : (_a_canonical = _a.canonicalize());
size_t n_values = a.num_elem();
for (size_t i = 0; i < n_values; ++i) {
write_value(a[i]);
}
/** Update Statistics **/
m->check_fixed_column_size(v);
if (track_statistics) {
m_stats->update_numeric_statistics(thread_idx, a.raw_elements());
}
} else {
ASSERT_TRUE(false);
}
break;
}
case ml_column_mode::CATEGORICAL: {
/** Map out the value. **/
// Missing values are always treated as their own category.
size_t index;
if (!immutable_metadata) {
index = m_idx->map_value_to_index(thread_idx, v);
} else {
index = m_idx->immutable_map_value_to_index(v);
}
/** Write out the data in the proper format. **/
write_index(index);
/** Update Statistics **/
if (track_statistics) {
index_vect = {index};
m_stats->update_categorical_statistics(thread_idx, index_vect);
}
break;
}
case ml_column_mode::CATEGORICAL_VECTOR: {
// Top level categorical vector is missing.
if (UNLIKELY(v.get_type() == flex_type_enum::UNDEFINED)) {
verify_missing_categoricals_okay();
write_size(0);
} else {
const flex_list& vv = v.get<flex_list>();
size_t n_values = vv.size();
index_vect.resize(n_values);
for (size_t k = 0; k < n_values; ++k) {
if (!immutable_metadata) {
index_vect[k] = m_idx->map_value_to_index(thread_idx, vv[k]);
} else {
index_vect[k] = m_idx->immutable_map_value_to_index(vv[k]);
}
}
// Now, we want to sort the dictionaries by index; this
// permits easy filling of an Eigen sparse vector when
// the data is loaded, as we can insert it by index
// order.
std::sort(index_vect.begin(), index_vect.end());
/** Write out the data in the proper format. **/
write_size(n_values);
for (size_t k = 0; k < n_values; ++k) write_index(index_vect[k]);
/** Update Statistics **/
if (track_statistics)
m_stats->update_categorical_statistics(thread_idx, index_vect);
}
break;
}
case ml_column_mode::DICTIONARY: {
DASSERT_TRUE(exclusion_indices.empty());
// Top level dictionary is missing
if (v.get_type() == flex_type_enum::UNDEFINED) {
verify_missing_categoricals_okay();
write_size(0);
} else {
const flex_dict& dv = v.get<flex_dict>();
size_t n_values = dv.size();
idx_value_vect.resize(n_values);
for (size_t k = 0; k < n_values; ++k) {
const std::pair<flexible_type, flexible_type>& kvp = dv[k];
size_t index;
if (!immutable_metadata) {
index = m_idx->map_value_to_index(thread_idx, kvp.first);
} else {
index = m_idx->immutable_map_value_to_index(kvp.first);
}
double value;
if (kvp.second.get_type() == flex_type_enum::INTEGER ||
kvp.second.get_type() == flex_type_enum::FLOAT) {
value = kvp.second;
} else if (kvp.second.get_type() == flex_type_enum::UNDEFINED) {
value = get_missing_numeric_value(index);
exclusion_indices.push_back(index);
} else {
auto throw_error = [&]() GL_GCC_ONLY(GL_COLD_NOINLINE) {
log_and_throw(std::string("Dictionary value for key '") +
std::string(kvp.first) + "' in column '" +
m->name + "' is not numeric.");
};
throw_error();
}
idx_value_vect[k] = {index, value};
}
// Now, we want to sort the dictionaries by index; this
// permits easy filling of an Eigen sparse vector when
// the data is loaded, as we can insert it by index
// order.
std::sort(idx_value_vect.begin(), idx_value_vect.end());
/** Write out the data in the proper format. **/
write_size(n_values);
for (size_t k = 0; k < n_values; ++k)
write_index_value_pair(idx_value_vect[k]);
/** Update Statistics **/
if (track_statistics) {
if (!exclusion_indices.empty()) {
// Efficiently remove the values recorded above as missing.
auto rm_missing_values = [&]() GL_GCC_ONLY(
GL_HOT_NOINLINE_FLATTEN) {
// Fast track the common single-occurance case.
if (exclusion_indices.size() == 1) {
size_t rm_index = exclusion_indices[0];
auto it = std::remove_if(
idx_value_vect.begin(), idx_value_vect.end(),
[&](const std::pair<size_t, double>& p) {
return p.first == rm_index;
});
idx_value_vect.resize(it - idx_value_vect.begin());
} else {
std::sort(exclusion_indices.begin(),
exclusion_indices.end());
auto it = std::remove_if(
idx_value_vect.begin(), idx_value_vect.end(),
[&](const std::pair<size_t, double>& p) {
// Is it in the exclusion_indices?
auto fit = std::lower_bound(exclusion_indices.begin(),
exclusion_indices.end(),
p.first);
return (fit != exclusion_indices.end() &&
*fit == p.first);
});
idx_value_vect.resize(it - idx_value_vect.begin());
}
exclusion_indices.clear();
};
rm_missing_values();
}
m_stats->update_dict_statistics(thread_idx, idx_value_vect);
}
}
break;
}
case ml_column_mode::CATEGORICAL_SORTED:
case ml_column_mode::UNTRANSLATED:
break;
} // end switch over column mode
} // End loop over columns
if (!rm.data_size_is_constant) {
DASSERT_TRUE(n_row_elements_write_index != size_t(-1));
block_output.entry_data[n_row_elements_write_index].index_value =
block_output.entry_data.size() - n_row_elements_write_index;
}
// Check the maximimum row size.
max_row_size = std::max(max_row_size, row_size);
} // End loop over rows in buffer
return max_row_size;
}