void slicer::slice_and_populate_slice_store()

in src/native/diffs/core/slicer.cpp [315:447]


void slicer::slice_and_populate_slice_store(const item_definition &item_to_slice)
{
	// printf("Slicing and populating using item: %s\n", item_to_slice.to_string().c_str());
	if (m_state != slicing_state::running)
	{
		throw errors::user_exception(
			errors::error_code::diff_slicing_invalid_state,
			"slicer::slice_and_populate_slice_store: Expected slicing state to be running, actual: "
				+ std::to_string((int)m_state));
	}

	if (m_items_to_slices_requested.count(item_to_slice) == 0)
	{
		throw errors::user_exception(
			errors::error_code::diff_slicing_no_slices_requested,
			"slicer::slice_and_populate_slice_store: No slices requested");
	}

	std::unique_ptr<offset_to_item_map> slices_requested;
	slices_requested.swap(m_items_to_slices_requested[item_to_slice]);
	m_items_to_slices_requested.erase(item_to_slice);

	std::shared_ptr<prepared_item> prepared_item_to_slice = m_item_to_slice_prepared[item_to_slice];

	m_running_thread_count++;
	m_starting_thread_count--;
	m_state_cv.notify_all();

	// this may end up kicking off more prepares and slicing needed
	auto reader = prepared_item_to_slice->make_sequential_reader();

	// printf("We're slicing %s\n", item_to_slice.to_string().c_str());
	hashing::hasher hasher(hashing::algorithm::sha256);

	for (auto offset_and_slice : *slices_requested)
	{
		if (m_state == slicing_state::paused)
		{
			m_paused_thread_count++;
			m_running_thread_count--;
			m_state_cv.notify_all();
			std::unique_lock<std::mutex> lock(m_state_mutex);
			m_state_cv.wait(lock, [&] { return (m_state != slicing_state::paused); });
			m_running_thread_count++;
			m_paused_thread_count--;
		}

		if (m_state == slicing_state::cancelled)
		{
			return;
		}

		auto offset = offset_and_slice.first;
		auto slice  = offset_and_slice.second;

		ADU_LOG(
			"slicer::slice_and_populate_slice_store: Slicing {} out of {}",
			slice.to_string(),
			item_to_slice.to_string());

		auto current_offset = reader->tellg();

		ADU_LOG("Expecting slice: {} at offset: {}. Current offset: {}", slice.to_string(), offset, current_offset);

		if (current_offset > offset)
		{
			std::string msg = "slicer::request_slice: Found overlap with slices. ";
			msg += "offset: " + std::to_string(offset);
			msg += ", current: " + std::to_string(current_offset);
			throw errors::user_exception(errors::error_code::diff_slicing_request_slice_overlap, msg);
		}

		auto to_skip = offset - current_offset;
		if (to_skip)
		{
			ADU_LOG(
				"slicer::slice_and_populate_slice_store: Skipped {} in reader of {} bytes.", to_skip, reader->size());
			reader->skip(to_skip);
		}

		auto slice_vector = std::make_shared<std::vector<char>>();
		if (slice.size() > std::numeric_limits<size_t>::max())
		{
			throw errors::user_exception(errors::error_code::diff_slicing_request_size_too_large);
		}
		slice_vector->reserve(static_cast<size_t>(slice.size()));

		reader->read(std::span<char>(slice_vector->data(), slice_vector->capacity()));
		ADU_LOG(
			"slicer::slice_and_populate_slice_store: Read {} in reader of {} bytes.",
			slice_vector->capacity(),
			reader->size());

		hasher.reset();
		hasher.hash_data(std::string_view{slice_vector->data(), slice_vector->capacity()});
		auto slice_hash = hasher.get_hash();

		if (!slice.has_matching_hash(slice_hash))
		{
			std::string msg = "Generated slice doesn't match expected slice. ";
			msg += "Current offset: " + std::to_string(offset);
			msg += ", Slice: " + slice.to_string();
			msg += ", slice_hash: " + slice_hash.get_string();
			throw errors::user_exception(errors::error_code::diff_slicing_produced_hash_mismatch, msg);
		}

		std::shared_ptr<io::reader_factory> cache_entry_factory = std::make_shared<io::buffer::reader_factory>(
			slice_vector, io::buffer::io_device::size_kind::vector_capacity);

		auto prep_slice = std::make_shared<diffs::core::prepared_item>(
			slice, diffs::core::prepared_item::reader_kind{cache_entry_factory});

		{
			std::lock_guard<std::mutex> lock(m_store_mutex);

			if (m_slice_item_request_counts.count(slice) == 0)
			{
				throw errors::user_exception(
					errors::error_code::diff_slicing_no_requests_for_slice, "No requests for slice.");
			}

			auto count = m_slice_item_request_counts[slice];

			ADU_LOG("Stored slice for {}.", slice.to_string());
			m_stored_slices.insert(std::pair{slice, std::pair{prep_slice, count}});

			m_slice_item_request_counts.erase(slice);
		}

		ADU_LOG("Notified: m_stored_slices_cv for {}", slice.to_string());
		m_store_cv.notify_all();
	}
}