aios/storage/indexlib/index/common/numeric_compress/EquivalentCompressWriter.h (510 lines of code) (raw):

/* * Copyright 2014-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <memory> #include "indexlib/file_system/file/FileWriter.h" #include "indexlib/file_system/fslib/FslibWrapper.h" #include "indexlib/index/common/numeric_compress/EquivalentCompressReader.h" #include "indexlib/util/slice_array/ByteAlignedSliceArray.h" namespace indexlib::index { class EquivalentCompressWriterBase { public: typedef indexlib::util::AlignedSliceArray<uint64_t> IndexArray; typedef indexlib::util::ByteAlignedSliceArray DataArray; public: EquivalentCompressWriterBase() : _slotItemPowerNum(0) , _slotItemCount(0) , _itemCount(0) , _dataArrayCursor(0) , _indexArray(nullptr) , _dataArray(nullptr) , _pool(nullptr) { } explicit EquivalentCompressWriterBase(autil::mem_pool::PoolBase* pool) : _slotItemPowerNum(0) , _slotItemCount(0) , _itemCount(0) , _dataArrayCursor(0) , _indexArray(nullptr) , _dataArray(nullptr) , _pool(pool) { } virtual ~EquivalentCompressWriterBase() { indexlib::IE_POOL_COMPATIBLE_DELETE_CLASS(_pool, _indexArray); indexlib::IE_POOL_COMPATIBLE_DELETE_CLASS(_pool, _dataArray); } public: void Init(uint32_t slotItemCount) { Reset(); _slotItemPowerNum = GetUpperPowerNumber(slotItemCount); _slotItemCount = (1 << _slotItemPowerNum); _indexArray = IE_POOL_COMPATIBLE_NEW_CLASS(_pool, IndexArray, INDEX_SLICE_LEN, 1, nullptr, _pool); _dataArray = IE_POOL_COMPATIBLE_NEW_CLASS(_pool, DataArray, DATA_SLICE_LEN, 1, nullptr, _pool); InitSlotBuffer(); } size_t GetCompressLength() { FlushSlotBuffer(); size_t compLen = 2 * sizeof(uint32_t); if (_itemCount == 0) { return compLen; } uint32_t slotCount = (_itemCount + _slotItemCount - 1) / _slotItemCount; compLen += (slotCount * sizeof(uint64_t)); compLen += _dataArrayCursor; return compLen; } size_t DumpBuffer(uint8_t* buffer, size_t bufferLen) { FlushSlotBuffer(); assert(bufferLen >= GetCompressLength()); uint8_t* cursor = buffer; *(uint32_t*)cursor = _itemCount; cursor += sizeof(uint32_t); *(uint32_t*)cursor = _slotItemPowerNum; cursor += sizeof(uint32_t); for (uint32_t i = 0; i < _indexArray->GetSliceNum(); ++i) { uint32_t sliceLen = _indexArray->GetSliceDataLen(i); uint64_t* slice = _indexArray->GetSlice(i); memcpy(cursor, slice, sliceLen * sizeof(uint64_t)); cursor += (sliceLen * sizeof(uint64_t)); } for (uint32_t i = 0; i < _dataArray->GetSliceNum(); ++i) { uint32_t sliceLen = _dataArray->GetSliceDataLen(i); char* slice = _dataArray->GetSlice(i); memcpy(cursor, slice, sliceLen); cursor += sliceLen; } return cursor - buffer; } Status Dump(const std::shared_ptr<indexlib::file_system::FileWriter>& file) { FlushSlotBuffer(); auto [st, _] = file->Write(&_itemCount, sizeof(uint32_t)).StatusWith(); RETURN_IF_STATUS_ERROR(st, "write item count failed, itemCount[%u]", _itemCount); std::tie(st, _) = file->Write(&_slotItemPowerNum, sizeof(uint32_t)).StatusWith(); RETURN_IF_STATUS_ERROR(st, "write item power num failed, powerNum[%u]", _slotItemPowerNum); for (uint32_t i = 0; i < _indexArray->GetSliceNum(); ++i) { uint32_t sliceLen = _indexArray->GetSliceDataLen(i); uint64_t* slice = _indexArray->GetSlice(i); std::tie(st, _) = file->Write(slice, sliceLen * sizeof(uint64_t)).StatusWith(); RETURN_IF_STATUS_ERROR(st, "write index slice len failed, len[%u]", sliceLen); } for (uint32_t i = 0; i < _dataArray->GetSliceNum(); ++i) { uint32_t sliceLen = _dataArray->GetSliceDataLen(i); char* slice = _dataArray->GetSlice(i); std::tie(st, _) = file->Write(slice, sliceLen).StatusWith(); RETURN_IF_STATUS_ERROR(st, "write data slice len failed, len[%u]", sliceLen); } return Status::OK(); } protected: virtual void InitSlotBuffer() = 0; virtual void FreeSlotBuffer() = 0; virtual void FlushSlotBuffer() = 0; void Reset() { FreeSlotBuffer(); indexlib::IE_POOL_COMPATIBLE_DELETE_CLASS(_pool, _indexArray); indexlib::IE_POOL_COMPATIBLE_DELETE_CLASS(_pool, _dataArray); _slotItemPowerNum = 0; _slotItemCount = 0; _itemCount = 0; _dataArrayCursor = 0; } static uint32_t GetUpperPowerNumber(uint32_t slotItemCount) { uint32_t powerNum = 6; // min slotItemNum is 2^6 = 64 while ((uint32_t)(1 << powerNum) < slotItemCount) { ++powerNum; } return powerNum; } private: constexpr static uint32_t INDEX_SLICE_LEN = 8 * 1024; constexpr static uint32_t DATA_SLICE_LEN = 256 * 1024; protected: uint32_t _slotItemPowerNum; uint32_t _slotItemCount; uint32_t _itemCount; size_t _dataArrayCursor; IndexArray* _indexArray; DataArray* _dataArray; autil::mem_pool::PoolBase* _pool; protected: friend class EquivalentCompressWriterTest; AUTIL_LOG_DECLARE(); }; /////////////////////////////////////////////////////////// template <typename T> class EquivalentCompressWriter : public EquivalentCompressWriterBase { public: typedef typename EquivalentCompressTraits<T>::ZigZagEncoder ZigZagEncoder; typedef typename EquivalentCompressTraits<T>::EncoderType UT; public: EquivalentCompressWriter() : EquivalentCompressWriterBase(nullptr), _slotBuffer(nullptr), _cursorInBuffer(0) {} explicit EquivalentCompressWriter(autil::mem_pool::PoolBase* pool) : EquivalentCompressWriterBase(pool) , _slotBuffer(nullptr) , _cursorInBuffer(0) { } ~EquivalentCompressWriter() { FreeSlotBuffer(); } public: // could be called repeatedly before calling FlushSlotBuffer // eg: GetCompressLength / DumpBuffer/ DumpFile / Dump void CompressData(T* data, uint32_t count); void CompressData(const std::vector<T>& dataVec) { CompressData((T*)dataVec.data(), dataVec.size()); } Status CompressData(const EquivalentCompressReader<T>& reader); public: static std::pair<Status, size_t> CalculateCompressLength(T* data, uint32_t count, uint32_t slotItemCount) { ItemIteratorTyped<T> itemIter(data, count); // no exception here, so can we ignore the status here ? return CalculateCompressLength(itemIter, slotItemCount); } static std::pair<Status, size_t> CalculateCompressLength(const std::vector<T>& dataVec, uint32_t slotItemCount) { ItemIteratorTyped<T> itemIter((const T*)dataVec.data(), (uint32_t)dataVec.size()); return CalculateCompressLength(itemIter, slotItemCount); } static std::pair<Status, size_t> CalculateCompressLength(const EquivalentCompressReader<T>& reader, uint32_t slotItemCount) { return CalculateCompressLength(reader.CreateIterator(), slotItemCount); } template <typename ItemIterator> static std::pair<Status, size_t> CalculateCompressLength(ItemIterator iter, uint32_t slotItemCount) { size_t compSize = sizeof(uint32_t) * 2; uint32_t powerNum = EquivalentCompressWriter<T>::GetUpperPowerNumber(slotItemCount); slotItemCount = (1 << powerNum); UT* buffer = new UT[slotItemCount]; uint32_t cursorInBuffer = 0; uint32_t count = 0; while (iter.HasNext()) { auto [status, nextVal] = iter.Next(); RETURN2_IF_STATUS_ERROR(status, 0, "read next value fail when calculate compress length"); buffer[cursorInBuffer++] = ZigZagEncoder::Encode(nextVal); if (cursorInBuffer == slotItemCount) { compSize += CalculateDeltaBufferSize(buffer, cursorInBuffer); cursorInBuffer = 0; } ++count; } if (cursorInBuffer > 0) { compSize += CalculateDeltaBufferSize(buffer, cursorInBuffer); } delete[] buffer; // calculate slotItems size uint32_t slotMask = (1 << powerNum) - 1; uint32_t slotCount = (count + slotMask) >> powerNum; compSize += sizeof(uint64_t) * slotCount; return std::make_pair(Status::OK(), compSize); } protected: void InitSlotBuffer() override { assert(!_slotBuffer); _slotBuffer = IE_POOL_COMPATIBLE_NEW_VECTOR(_pool, UT, _slotItemCount); _cursorInBuffer = 0; } void FreeSlotBuffer() override { IE_POOL_COMPATIBLE_DELETE_VECTOR(_pool, _slotBuffer, _slotItemCount); _cursorInBuffer = 0; } void FlushSlotBuffer() override { if (_cursorInBuffer == 0) { return; } UT minValue = 0; UT maxDelta = GetMaxDeltaInBuffer(_slotBuffer, _cursorInBuffer, minValue); if (NeedStoreDelta(maxDelta, minValue)) { ConvertToDeltaBuffer(_slotBuffer, _cursorInBuffer, minValue); SlotItemType slotType = GetSlotItemType(maxDelta); AppendSlotItem(slotType, _dataArrayCursor); _dataArrayCursor += FlushDeltaBuffer(_slotBuffer, _cursorInBuffer, minValue, slotType); } else { AppendSlotItem(SIT_EQUAL, minValue); } _itemCount += _cursorInBuffer; _cursorInBuffer = 0; } private: static UT GetMaxDeltaInBuffer(UT* slotBuffer, uint32_t itemCount, UT& minValue) { assert(itemCount > 0); UT targetValue = slotBuffer[0]; UT baseValue = targetValue; UT maxDelta = 0; for (size_t idx = 1; idx < itemCount; ++idx) { UT curValue = slotBuffer[idx]; if (curValue != targetValue) { if (curValue <= baseValue) { maxDelta += (baseValue - curValue); baseValue = curValue; } else { UT delta = curValue - baseValue; if (delta > maxDelta) { maxDelta = delta; } } } } minValue = baseValue; return maxDelta; } static void ConvertToDeltaBuffer(UT* slotBuffer, uint32_t itemCount, UT minValue) { for (size_t i = 0; i < itemCount; ++i) { assert(slotBuffer[i] >= minValue); slotBuffer[i] = slotBuffer[i] - minValue; } } static size_t GetMinDeltaBufferSize(UT maxDelta, uint32_t bufferItemCount) { SlotItemType slotType = GetSlotItemType(maxDelta); return GetCompressDeltaBufferSize(slotType, bufferItemCount); } static size_t CalculateDeltaBufferSize(UT* buffer, uint32_t cursorInBuffer); static bool NeedStoreDelta(UT maxDelta, UT minValue); size_t FlushDeltaBuffer(UT* deltaBuffer, uint32_t itemCount, UT minValue, SlotItemType slotType); size_t CompressDeltaBuffer(UT* deltaBuffer, uint32_t itemCount, SlotItemType slotType); void AppendSlotItem(SlotItemType slotType, uint64_t value); // for long value static size_t CalculateLongValueDeltaBufferSize(UT* buffer, uint32_t cursorInBuffer); static bool NeedStoreLongValueDelta(UT maxDelta, UT minValue); size_t FlushLongValueDeltaBuffer(UT* deltaBuffer, uint32_t itemCount, UT minValue, SlotItemType slotType); void AppendLongValueSlotItem(SlotItemType slotType, uint64_t value); private: UT* _slotBuffer; uint32_t _cursorInBuffer; private: friend class EquivalentCompressReader<T>; friend class EquivalentCompressWriterTest; AUTIL_LOG_DECLARE(); }; AUTIL_LOG_SETUP_TEMPLATE(indexlib.index, EquivalentCompressWriter, T); /////////////////////////////////////////////////////////////// template <typename T> inline size_t EquivalentCompressWriter<T>::CompressDeltaBuffer(UT* deltaBuffer, uint32_t itemCount, SlotItemType slotType) { for (uint32_t i = 0; i < itemCount; i++) { EquivalentCompressReader<T>::InplaceUpdateDeltaArray((uint8_t*)deltaBuffer, slotType, i, deltaBuffer[i]); } return GetCompressDeltaBufferSize(slotType, itemCount); } template <typename T> inline size_t EquivalentCompressWriter<T>::FlushDeltaBuffer(UT* deltaBuffer, uint32_t itemCount, UT minValue, SlotItemType slotType) { size_t idx = 0; _dataArray->SetTypedValue<UT>(_dataArrayCursor + idx, minValue); idx += sizeof(UT); size_t compsize = CompressDeltaBuffer(deltaBuffer, itemCount, slotType); _dataArray->SetList(_dataArrayCursor + idx, (const char*)deltaBuffer, compsize); idx += compsize; return idx; } template <typename T> inline size_t EquivalentCompressWriter<T>::FlushLongValueDeltaBuffer(UT* deltaBuffer, uint32_t itemCount, UT minValue, SlotItemType slotType) { #undef DumpDeltaFlag #define DumpDeltaFlag(DELTA_FLAG) \ UT deltaFlag = DELTA_FLAG; \ _dataArray->SetTypedValue<UT>(_dataArrayCursor + idx, deltaFlag); \ idx += sizeof(UT); size_t idx = 0; _dataArray->SetTypedValue<UT>(_dataArrayCursor + idx, minValue); idx += sizeof(UT); DumpDeltaFlag(SlotItemTypeToDeltaFlag(slotType)); size_t compsize = CompressDeltaBuffer(deltaBuffer, itemCount, slotType); _dataArray->SetList(_dataArrayCursor + idx, (const char*)deltaBuffer, compsize); idx += compsize; return idx; } template <> inline size_t EquivalentCompressWriter<uint64_t>::FlushDeltaBuffer(uint64_t* deltaBuffer, uint32_t itemCount, uint64_t minValue, SlotItemType slotType) { return FlushLongValueDeltaBuffer(deltaBuffer, itemCount, minValue, slotType); } template <> inline size_t EquivalentCompressWriter<int64_t>::FlushDeltaBuffer(uint64_t* deltaBuffer, uint32_t itemCount, uint64_t minValue, SlotItemType slotType) { return FlushLongValueDeltaBuffer(deltaBuffer, itemCount, minValue, slotType); } template <> inline size_t EquivalentCompressWriter<double>::FlushDeltaBuffer(uint64_t* deltaBuffer, uint32_t itemCount, uint64_t minValue, SlotItemType slotType) { return FlushLongValueDeltaBuffer(deltaBuffer, itemCount, minValue, slotType); } template <typename T> inline bool EquivalentCompressWriter<T>::NeedStoreDelta(UT maxDelta, UT minValue) { return (maxDelta > 0); } template <typename T> inline bool EquivalentCompressWriter<T>::NeedStoreLongValueDelta(UT maxDelta, UT minValue) { return (maxDelta > 0 || minValue > (uint64_t)std::numeric_limits<int64_t>::max()); } template <> inline bool EquivalentCompressWriter<uint64_t>::NeedStoreDelta(uint64_t maxDelta, uint64_t minValue) { return NeedStoreLongValueDelta(maxDelta, minValue); } template <> inline bool EquivalentCompressWriter<int64_t>::NeedStoreDelta(uint64_t maxDelta, uint64_t minValue) { return NeedStoreLongValueDelta(maxDelta, minValue); } template <> inline bool EquivalentCompressWriter<double>::NeedStoreDelta(uint64_t maxDelta, uint64_t minValue) { return NeedStoreLongValueDelta(maxDelta, minValue); } template <typename T> inline void EquivalentCompressWriter<T>::AppendSlotItem(SlotItemType slotType, uint64_t value) { union SlotItemUnion { SlotItem slotItem; uint64_t slotValue; }; SlotItemUnion slotUnion; slotUnion.slotItem.slotType = slotType; slotUnion.slotItem.value = value; _indexArray->Set(_itemCount >> _slotItemPowerNum, slotUnion.slotValue); } template <typename T> inline void EquivalentCompressWriter<T>::AppendLongValueSlotItem(SlotItemType slotType, uint64_t value) { union SlotItemUnion { LongSlotItem slotItem; uint64_t slotValue; }; SlotItemUnion slotUnion; slotUnion.slotItem.isValue = (slotType == SIT_EQUAL) ? 1 : 0; slotUnion.slotItem.value = value; _indexArray->Set(_itemCount >> _slotItemPowerNum, slotUnion.slotValue); } template <> inline void EquivalentCompressWriter<uint64_t>::AppendSlotItem(SlotItemType slotType, uint64_t value) { AppendLongValueSlotItem(slotType, value); } template <> inline void EquivalentCompressWriter<int64_t>::AppendSlotItem(SlotItemType slotType, uint64_t value) { AppendLongValueSlotItem(slotType, value); } template <> inline void EquivalentCompressWriter<double>::AppendSlotItem(SlotItemType slotType, uint64_t value) { AppendLongValueSlotItem(slotType, value); } template <typename T> inline void EquivalentCompressWriter<T>::CompressData(T* data, uint32_t count) { for (uint32_t idx = 0; idx < count; ++idx) { assert(_cursorInBuffer < _slotItemCount); _slotBuffer[_cursorInBuffer++] = ZigZagEncoder::Encode(data[idx]); if (_cursorInBuffer == _slotItemCount) { FlushSlotBuffer(); } } } template <typename T> inline Status EquivalentCompressWriter<T>::CompressData(const EquivalentCompressReader<T>& reader) { uint32_t count = reader.Size(); for (size_t idx = 0; idx < count; ++idx) { assert(_cursorInBuffer < _slotItemCount); auto [status, curVal] = reader[idx]; RETURN_IF_STATUS_ERROR(status, "read data fail from EquivalentCompressReader"); _slotBuffer[_cursorInBuffer++] = ZigZagEncoder::Encode(curVal); if (_cursorInBuffer == _slotItemCount) { FlushSlotBuffer(); } } return Status::OK(); } template <typename T> inline size_t EquivalentCompressWriter<T>::CalculateDeltaBufferSize(UT* buffer, uint32_t bufferSize) { UT minValue = 0; UT maxDelta = GetMaxDeltaInBuffer(buffer, bufferSize, minValue); if (!NeedStoreDelta(maxDelta, minValue)) { return 0; } return sizeof(UT) + GetMinDeltaBufferSize(maxDelta, bufferSize); } template <typename T> inline size_t EquivalentCompressWriter<T>::CalculateLongValueDeltaBufferSize(UT* buffer, uint32_t cursorInBuffer) { UT minValue = 0; UT maxDelta = GetMaxDeltaInBuffer(buffer, cursorInBuffer, minValue); if (!NeedStoreDelta(maxDelta, minValue)) { return 0; } return sizeof(UT) * 2 + GetMinDeltaBufferSize(maxDelta, cursorInBuffer); } template <> inline size_t EquivalentCompressWriter<uint64_t>::CalculateDeltaBufferSize(uint64_t* buffer, uint32_t cursorInBuffer) { return CalculateLongValueDeltaBufferSize(buffer, cursorInBuffer); } template <> inline size_t EquivalentCompressWriter<int64_t>::CalculateDeltaBufferSize(uint64_t* buffer, uint32_t cursorInBuffer) { return CalculateLongValueDeltaBufferSize(buffer, cursorInBuffer); } template <> inline size_t EquivalentCompressWriter<double>::CalculateDeltaBufferSize(uint64_t* buffer, uint32_t cursorInBuffer) { return CalculateLongValueDeltaBufferSize(buffer, cursorInBuffer); } ////////////////////////////////not support type///////////////////// template <> inline size_t EquivalentCompressWriter<autil::uint128_t>::CalculateDeltaBufferSize(autil::uint128_t* buffer, uint32_t cursorInBuffer) { assert(false); return 0; } template <> inline void EquivalentCompressWriter<autil::uint128_t>::CompressData(autil::uint128_t* data, uint32_t count) { assert(false); } template <> inline Status EquivalentCompressWriter<autil::uint128_t>::CompressData(const EquivalentCompressReader<autil::uint128_t>& reader) { assert(false); return Status::InternalError("un-support compress operation for type : [autil::uint128_t]"); } template <> inline std::pair<Status, size_t> EquivalentCompressWriter<autil::uint128_t>::CalculateCompressLength(autil::uint128_t* data, uint32_t count, uint32_t slotItemCount) { assert(false); return std::make_pair(Status::InternalError("un-support type [autil::uint128]"), 0); } template <> inline std::pair<Status, size_t> EquivalentCompressWriter<autil::uint128_t>::CalculateCompressLength( const EquivalentCompressReader<autil::uint128_t>& reader, uint32_t slotItemCount) { assert(false); return std::make_pair(Status::InternalError("un-support type [autil::uint128]"), 0); } template <> inline void EquivalentCompressWriter<autil::uint128_t>::FlushSlotBuffer() { assert(false); } } // namespace indexlib::index