lib/core/CDataFrameRowSlice.cc (303 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <core/CDataFrameRowSlice.h> #include <core/CBase64Filter.h> #include <core/CDataFrame.h> #include <core/CHashing.h> #include <core/CLogger.h> #include <core/CMemoryDef.h> #include <core/CompressUtils.h> #include <boost/filesystem.hpp> #include <fstream> #include <memory> #include <vector> namespace ml { namespace core { using TFloatVec = std::vector<CFloatStorage, CAlignedAllocator<CFloatStorage>>; using TFloatVecItr = TFloatVec::iterator; using TInt32Vec = std::vector<std::int32_t>; using TInt32VecCItr = TInt32Vec::const_iterator; namespace { using namespace data_frame_row_slice_detail; //! \brief A handle for reading CRawDataFrameRowSlice objects. //! //! DESCRIPTION:\n //! This stores a reference to the underlying memory. This is primarily //! intended to stop duplication of the underlying data frame state. //! Together with our memory mapped vector type this means algorithms //! on top of a raw data frame can work entirely in terms of the raw //! values stored in the data frame. class CMainMemoryDataFrameRowSliceHandle final : public CDataFrameRowSliceHandleImpl { public: CMainMemoryDataFrameRowSliceHandle(std::size_t firstRow, TFloatVec& rows, const TInt32Vec& docHashes) : m_FirstRow{firstRow}, m_Rows{rows}, m_DocHashes{docHashes} {} TImplPtr clone() const override { return std::make_unique<CMainMemoryDataFrameRowSliceHandle>(m_FirstRow, m_Rows, m_DocHashes); } std::size_t indexOfFirstRow() const override { return m_FirstRow; } TFloatVec& rows() const override { return m_Rows; } const TInt32Vec& docHashes() const override { return m_DocHashes; } bool bad() const override { return false; } private: using TFloatVecRef = std::reference_wrapper<TFloatVec>; using TInt32VecCRef = std::reference_wrapper<const TInt32Vec>; private: std::size_t m_FirstRow; TFloatVecRef m_Rows; TInt32VecCRef m_DocHashes; }; //! \brief A handle for reading CRawDataFrameRowSlice objects. //! //! DESCRIPTION:\n //! This stores a copy of values since these are created on-the-fly when //! the slice is inflated. class COnDiskDataFrameRowSliceHandle final : public CDataFrameRowSliceHandleImpl { public: COnDiskDataFrameRowSliceHandle(std::size_t firstRow, TFloatVec rows, TInt32Vec docHashes) : m_FirstRow{firstRow}, m_Rows{std::move(rows)}, m_DocHashes{std::move(docHashes)} {} TImplPtr clone() const override { return std::make_unique<COnDiskDataFrameRowSliceHandle>(m_FirstRow, m_Rows, m_DocHashes); } std::size_t indexOfFirstRow() const override { return m_FirstRow; } TFloatVec& rows() const override { return m_Rows; } const TInt32Vec& docHashes() const override { return m_DocHashes; } bool bad() const override { return false; } private: std::size_t m_FirstRow; mutable TFloatVec m_Rows; TInt32Vec m_DocHashes; }; //! \brief The implementation of a bad handle. //! //! DESCRIPTION:\n //! This is used to signal that there is a problem accessing the slice. class CBadDataFrameRowSliceHandle final : public CDataFrameRowSliceHandleImpl { public: TImplPtr clone() const override { return std::make_unique<CBadDataFrameRowSliceHandle>(); } std::size_t indexOfFirstRow() const override { return 0; } TFloatVec& rows() const override { return m_EmptyRows; } const TInt32Vec& docHashes() const override { return m_EmptyDocHashes; } bool bad() const override { return true; } private: //! Stub for the rows. mutable TFloatVec m_EmptyRows; //! Stub for the row document ids. TInt32Vec m_EmptyDocHashes; }; //! Checksum \p vec. template<typename T, typename ALLOCATOR> std::uint64_t computeChecksum(const std::vector<T, ALLOCATOR>& vec) { return CHashing::murmurHash64(vec.data(), static_cast<int>(sizeof(T) * vec.size()), 0); } //! Checksum \p rows and \p docHashes. std::uint64_t computeChecksum(const TFloatVec& rows, const TInt32Vec& docHashes) { return CHashing::hashCombine(computeChecksum(rows), computeChecksum(docHashes)); } } //////// CDataFrameRowSliceHandle //////// CDataFrameRowSliceHandle::CDataFrameRowSliceHandle(TImplPtr impl) : m_Impl{std::move(impl)} { } CDataFrameRowSliceHandle::CDataFrameRowSliceHandle(const CDataFrameRowSliceHandle& other) : m_Impl{other.m_Impl != nullptr ? other.m_Impl->clone() : nullptr} { } CDataFrameRowSliceHandle::CDataFrameRowSliceHandle(CDataFrameRowSliceHandle&& other) noexcept : m_Impl{std::move(other.m_Impl)} { } CDataFrameRowSliceHandle& CDataFrameRowSliceHandle:: operator=(const CDataFrameRowSliceHandle& other) { if (other.m_Impl != nullptr) { other.m_Impl->clone(); } return *this; } CDataFrameRowSliceHandle& CDataFrameRowSliceHandle:: operator=(CDataFrameRowSliceHandle&& other) noexcept = default; std::size_t CDataFrameRowSliceHandle::size() const { return m_Impl->rows().size(); } std::size_t CDataFrameRowSliceHandle::indexOfFirstRow() const { return m_Impl->indexOfFirstRow(); } TFloatVecItr CDataFrameRowSliceHandle::beginRows() const { return m_Impl->rows().begin(); } TFloatVecItr CDataFrameRowSliceHandle::endRows() const { return m_Impl->rows().end(); } TInt32VecCItr CDataFrameRowSliceHandle::beginDocHashes() const { return m_Impl->docHashes().begin(); } TInt32VecCItr CDataFrameRowSliceHandle::endDocHashes() const { return m_Impl->docHashes().end(); } const TFloatVec& CDataFrameRowSliceHandle::rows() const { return m_Impl->rows(); } const CDataFrameRowSliceHandle::TInt32Vec& CDataFrameRowSliceHandle::docHashes() const { return m_Impl->docHashes(); } bool CDataFrameRowSliceHandle::bad() const { return m_Impl->bad(); } //////// CMainMemoryDataFrameRowSlice //////// CMainMemoryDataFrameRowSlice::CMainMemoryDataFrameRowSlice(std::size_t firstRow, TFloatVec rows, TInt32Vec docHashes) : m_FirstRow{firstRow}, m_Rows{std::move(rows)}, m_DocHashes{std::move(docHashes)} { LOG_TRACE(<< "slice size = " << m_Rows.size() << " capacity = " << m_Rows.capacity()); m_Rows.shrink_to_fit(); m_DocHashes.shrink_to_fit(); } void CMainMemoryDataFrameRowSlice::reserve(std::size_t numberColumns, std::size_t extraColumns) { // "Reserve" space at the end of each row for extraColumns extra columns. // Padding is inserted into the underlying vector which is skipped over // by the CRowConstIterator object. std::size_t numberRows{m_Rows.size() / numberColumns}; std::size_t newNumberColumns{numberColumns + extraColumns}; try { TFloatVec state(m_Rows.size() + numberRows * extraColumns); for (auto i = m_Rows.begin(), j = state.begin(); i != m_Rows.end(); i += numberColumns, j += newNumberColumns) { std::copy(i, i + numberColumns, j); } std::swap(state, m_Rows); } catch (const std::exception& e) { HANDLE_FATAL(<< "Environment error: failed to reserve " << extraColumns << " extra columns: caught '" << e.what() << "'. The process is likely out of memory."); } } std::size_t CMainMemoryDataFrameRowSlice::indexOfFirstRow() const { return m_FirstRow; } std::size_t CMainMemoryDataFrameRowSlice::indexOfLastRow(std::size_t rowCapacity) const { return m_FirstRow + m_Rows.size() / rowCapacity - 1; } CDataFrameRowSliceHandle CMainMemoryDataFrameRowSlice::read() { return {std::make_unique<CMainMemoryDataFrameRowSliceHandle>(m_FirstRow, m_Rows, m_DocHashes)}; } void CMainMemoryDataFrameRowSlice::write(const TFloatVec&, const TInt32Vec&) { // Nothing to do. } std::size_t CMainMemoryDataFrameRowSlice::staticSize() const { return sizeof(*this); } std::size_t CMainMemoryDataFrameRowSlice::memoryUsage() const { return memory::dynamicSize(m_Rows) + memory::dynamicSize(m_DocHashes); } std::uint64_t CMainMemoryDataFrameRowSlice::checksum() const { return computeChecksum(m_Rows, m_DocHashes); } //////// CTemporaryDirectory //////// namespace { //! Check if there is \p minimumSpace disk space available. void sufficientDiskSpaceAvailable(const boost::filesystem::path& path, std::size_t minimumSpace) { boost::system::error_code errorCode; auto spaceInfo = boost::filesystem::space(path, errorCode); if (errorCode) { HANDLE_FATAL(<< "Environment error: failed to retrieve disk information for '" << path << "' error '" << errorCode.message() << "'."); } if (spaceInfo.available < minimumSpace) { HANDLE_FATAL(<< "Environment error: insufficient disk space have '" << spaceInfo.available << "' and need '" << minimumSpace << "'."); } } } CTemporaryDirectory::CTemporaryDirectory(const std::string& name, std::size_t minimumSpace) : m_Name{name} { m_Name /= boost::filesystem::unique_path("dataframe-%%%%-%%%%-%%%%-%%%%"); LOG_TRACE(<< "Trying to create directory '" << m_Name << "'"); boost::system::error_code errorCode; boost::filesystem::create_directories(m_Name, errorCode); if (errorCode) { HANDLE_FATAL(<< "Environment error: failed to create temporary directory from: '" << m_Name << "' error '" << errorCode.message() << "'"); } sufficientDiskSpaceAvailable(m_Name, minimumSpace); LOG_TRACE(<< "Created '" << m_Name << "'"); } CTemporaryDirectory::~CTemporaryDirectory() { this->removeAll(); } std::string CTemporaryDirectory::name() const { return m_Name.string(); } void CTemporaryDirectory::removeAll() { boost::system::error_code errorCode; boost::filesystem::remove_all(m_Name, errorCode); if (errorCode) { LOG_WARN(<< "Failed to cleanup temporary data from: '" << m_Name << "' error '" << errorCode.message() << "'."); } } //////// COnDiskDataFrameRowSlice //////// COnDiskDataFrameRowSlice::COnDiskDataFrameRowSlice(const TTemporaryDirectoryPtr& directory, std::size_t firstRow, TFloatVec rows, TInt32Vec docHashes) : m_FirstRow{firstRow}, m_RowsCapacity{rows.size()}, m_DocHashesCapacity{docHashes.size()}, m_Directory{directory}, m_FileName{directory->name()}, m_Checksum{0} { m_FileName /= boost::filesystem::unique_path( "rows-" + std::to_string(firstRow) + "-%%%%-%%%%-%%%%-%%%%"); this->writeToDisk(rows, docHashes); } void COnDiskDataFrameRowSlice::reserve(std::size_t numberColumns, std::size_t extraColumns) { // "Reserve" space at the end of each row for extraColumns extra columns. // Padding is inserted into the underlying vector which is skipped over // by the CRowConstIterator object. try { TFloatVec oldRows(m_RowsCapacity); TInt32Vec docHashes(m_DocHashesCapacity); if (this->readFromDisk(oldRows, docHashes) == false) { HANDLE_FATAL(<< "Environment error: failed to read from row " << m_FirstRow << "."); } std::size_t numberRows{oldRows.size() / numberColumns}; sufficientDiskSpaceAvailable(m_Directory->name(), numberRows * extraColumns); std::size_t newNumberColumns{numberColumns + extraColumns}; TFloatVec newRows(numberRows * newNumberColumns, 0.0); for (auto i = oldRows.begin(), j = newRows.begin(); i != oldRows.end(); i += numberColumns, j += newNumberColumns) { std::copy(i, i + numberColumns, j); } this->writeToDisk(newRows, docHashes); } catch (const std::exception& e) { HANDLE_FATAL(<< "Environment error: failed to reserve " << extraColumns << " extra columns: caught '" << e.what() << "'."); } } std::size_t COnDiskDataFrameRowSlice::indexOfFirstRow() const { return m_FirstRow; } std::size_t COnDiskDataFrameRowSlice::indexOfLastRow(std::size_t rowCapacity) const { return m_FirstRow + m_RowsCapacity / rowCapacity - 1; } CDataFrameRowSliceHandle COnDiskDataFrameRowSlice::read() { LOG_TRACE(<< "Reading slice starting at row " << m_FirstRow); TFloatVec rows; TInt32Vec docHashes; try { if (this->readFromDisk(rows, docHashes) == false) { HANDLE_FATAL(<< "Environment error: failed to read from row " << m_FirstRow << "."); } if (computeChecksum(rows, docHashes) != m_Checksum) { HANDLE_FATAL(<< "Environment error: corrupt from row " << m_FirstRow << "."); } } catch (const std::exception& e) { HANDLE_FATAL(<< "Environment error: caught '" << e.what() << "' while reading from row " << m_FirstRow << "."); } return {std::make_unique<COnDiskDataFrameRowSliceHandle>( m_FirstRow, std::move(rows), std::move(docHashes))}; } void COnDiskDataFrameRowSlice::write(const TFloatVec& rows, const TInt32Vec& docHashes) { this->writeToDisk(rows, docHashes); } std::size_t COnDiskDataFrameRowSlice::staticSize() const { return sizeof(*this); } std::size_t COnDiskDataFrameRowSlice::memoryUsage() const { return memory::dynamicSize(m_Directory) + memory::dynamicSize(m_FileName.string()); } void COnDiskDataFrameRowSlice::writeToDisk(const TFloatVec& rows, const TInt32Vec& docHashes) { LOG_TRACE(<< "Writing slice starting at row " << m_FirstRow); m_RowsCapacity = rows.size(); m_DocHashesCapacity = docHashes.size(); m_Checksum = computeChecksum(rows, docHashes); LOG_TRACE(<< "Checksum = " << m_Checksum); std::size_t rowsBytes{sizeof(CFloatStorage) * rows.size()}; std::size_t docHashesBytes{sizeof(std::int32_t) * docHashes.size()}; LOG_TRACE(<< "rows bytes = " << rowsBytes); LOG_TRACE(<< "doc hashes bytes = " << docHashesBytes); std::ofstream file{m_FileName.string(), std::ios_base::trunc | std::ios_base::binary}; file.write(reinterpret_cast<const char*>(rows.data()), rowsBytes); file.write(reinterpret_cast<const char*>(docHashes.data()), docHashesBytes); } std::uint64_t COnDiskDataFrameRowSlice::checksum() const { return m_Checksum; } bool COnDiskDataFrameRowSlice::readFromDisk(TFloatVec& rows, TInt32Vec& docHashes) const { rows.resize(m_RowsCapacity); docHashes.resize(m_DocHashesCapacity); std::size_t rowsBytes{sizeof(CFloatStorage) * m_RowsCapacity}; std::size_t docHashesBytes{sizeof(std::int32_t) * m_DocHashesCapacity}; LOG_TRACE(<< "rows bytes = " << rowsBytes); LOG_TRACE(<< "doc hashes bytes = " << docHashesBytes); std::ifstream file{m_FileName.string(), std::ios_base::binary}; file.read(reinterpret_cast<char*>(rows.data()), rowsBytes); file.read(reinterpret_cast<char*>(docHashes.data()), docHashesBytes); return file.bad() == false; } } }