in src/native/diffs/core/slicer.cpp [315:447]
void slicer::slice_and_populate_slice_store(const item_definition &item_to_slice)
{
// printf("Slicing and populating using item: %s\n", item_to_slice.to_string().c_str());
if (m_state != slicing_state::running)
{
throw errors::user_exception(
errors::error_code::diff_slicing_invalid_state,
"slicer::slice_and_populate_slice_store: Expected slicing state to be running, actual: "
+ std::to_string((int)m_state));
}
if (m_items_to_slices_requested.count(item_to_slice) == 0)
{
throw errors::user_exception(
errors::error_code::diff_slicing_no_slices_requested,
"slicer::slice_and_populate_slice_store: No slices requested");
}
std::unique_ptr<offset_to_item_map> slices_requested;
slices_requested.swap(m_items_to_slices_requested[item_to_slice]);
m_items_to_slices_requested.erase(item_to_slice);
std::shared_ptr<prepared_item> prepared_item_to_slice = m_item_to_slice_prepared[item_to_slice];
m_running_thread_count++;
m_starting_thread_count--;
m_state_cv.notify_all();
// this may end up kicking off more prepares and slicing needed
auto reader = prepared_item_to_slice->make_sequential_reader();
// printf("We're slicing %s\n", item_to_slice.to_string().c_str());
hashing::hasher hasher(hashing::algorithm::sha256);
for (auto offset_and_slice : *slices_requested)
{
if (m_state == slicing_state::paused)
{
m_paused_thread_count++;
m_running_thread_count--;
m_state_cv.notify_all();
std::unique_lock<std::mutex> lock(m_state_mutex);
m_state_cv.wait(lock, [&] { return (m_state != slicing_state::paused); });
m_running_thread_count++;
m_paused_thread_count--;
}
if (m_state == slicing_state::cancelled)
{
return;
}
auto offset = offset_and_slice.first;
auto slice = offset_and_slice.second;
ADU_LOG(
"slicer::slice_and_populate_slice_store: Slicing {} out of {}",
slice.to_string(),
item_to_slice.to_string());
auto current_offset = reader->tellg();
ADU_LOG("Expecting slice: {} at offset: {}. Current offset: {}", slice.to_string(), offset, current_offset);
if (current_offset > offset)
{
std::string msg = "slicer::request_slice: Found overlap with slices. ";
msg += "offset: " + std::to_string(offset);
msg += ", current: " + std::to_string(current_offset);
throw errors::user_exception(errors::error_code::diff_slicing_request_slice_overlap, msg);
}
auto to_skip = offset - current_offset;
if (to_skip)
{
ADU_LOG(
"slicer::slice_and_populate_slice_store: Skipped {} in reader of {} bytes.", to_skip, reader->size());
reader->skip(to_skip);
}
auto slice_vector = std::make_shared<std::vector<char>>();
if (slice.size() > std::numeric_limits<size_t>::max())
{
throw errors::user_exception(errors::error_code::diff_slicing_request_size_too_large);
}
slice_vector->reserve(static_cast<size_t>(slice.size()));
reader->read(std::span<char>(slice_vector->data(), slice_vector->capacity()));
ADU_LOG(
"slicer::slice_and_populate_slice_store: Read {} in reader of {} bytes.",
slice_vector->capacity(),
reader->size());
hasher.reset();
hasher.hash_data(std::string_view{slice_vector->data(), slice_vector->capacity()});
auto slice_hash = hasher.get_hash();
if (!slice.has_matching_hash(slice_hash))
{
std::string msg = "Generated slice doesn't match expected slice. ";
msg += "Current offset: " + std::to_string(offset);
msg += ", Slice: " + slice.to_string();
msg += ", slice_hash: " + slice_hash.get_string();
throw errors::user_exception(errors::error_code::diff_slicing_produced_hash_mismatch, msg);
}
std::shared_ptr<io::reader_factory> cache_entry_factory = std::make_shared<io::buffer::reader_factory>(
slice_vector, io::buffer::io_device::size_kind::vector_capacity);
auto prep_slice = std::make_shared<diffs::core::prepared_item>(
slice, diffs::core::prepared_item::reader_kind{cache_entry_factory});
{
std::lock_guard<std::mutex> lock(m_store_mutex);
if (m_slice_item_request_counts.count(slice) == 0)
{
throw errors::user_exception(
errors::error_code::diff_slicing_no_requests_for_slice, "No requests for slice.");
}
auto count = m_slice_item_request_counts[slice];
ADU_LOG("Stored slice for {}.", slice.to_string());
m_stored_slices.insert(std::pair{slice, std::pair{prep_slice, count}});
m_slice_item_request_counts.erase(slice);
}
ADU_LOG("Notified: m_stored_slices_cv for {}", slice.to_string());
m_store_cv.notify_all();
}
}