aios/storage/indexlib/index/inverted_index/truncate/SingleTruncateIndexWriter.cpp (348 lines of code) (raw):

/* * Copyright 2014-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "indexlib/index/inverted_index/truncate/SingleTruncateIndexWriter.h" #include "autil/StringUtil.h" #include "indexlib/base/Status.h" #include "indexlib/file_system/fslib/FslibWrapper.h" #include "indexlib/index/DocMapper.h" #include "indexlib/index/inverted_index/Common.h" #include "indexlib/index/inverted_index/PostingWriterImpl.h" #include "indexlib/index/inverted_index/TermMatchData.h" #include "indexlib/index/inverted_index/config/InvertedIndexConfig.h" #include "indexlib/index/inverted_index/format/PostingFormatOption.h" #include "indexlib/index/inverted_index/format/ShortListOptimizeUtil.h" #include "indexlib/index/inverted_index/format/TermMetaDumper.h" #include "indexlib/index/inverted_index/format/dictionary/DictionaryTypedFactory.h" #include "indexlib/index/inverted_index/format/dictionary/TieredDictionaryWriter.h" #include "indexlib/index/inverted_index/truncate/DocInfoAllocator.h" #include "indexlib/index/inverted_index/truncate/TruncatePostingIterator.h" #include "indexlib/util/MMapAllocator.h" namespace indexlib::index { AUTIL_LOG_SETUP(index, SingleTruncateIndexWriter); SingleTruncateIndexWriter::SingleTruncateIndexWriter( const std::shared_ptr<indexlibv2::config::InvertedIndexConfig>& indexConfig, const std::shared_ptr<indexlibv2::index::DocMapper>& docMapper) : _hasNullTerm(false) , _allocator(NULL) , _byteSlicePool(NULL) , _bufferPool(NULL) , _sortFieldRef(NULL) , _docMapper(docMapper) , _desc(false) { assert(indexConfig != nullptr); _indexFormatOption.Init(indexConfig); _invertedIndexConfig = indexConfig; } SingleTruncateIndexWriter::~SingleTruncateIndexWriter() { _metaFileWriter.reset(); ReleasePoolResource(); } void SingleTruncateIndexWriter::Init( const std::vector<std::shared_ptr<indexlibv2::framework::SegmentMeta>>& targetSegmentMetas) { _targetSegmentMetas = targetSegmentMetas; } void SingleTruncateIndexWriter::SetParam(const std::shared_ptr<IEvaluator>& evaluator, const std::shared_ptr<DocCollector>& collector, const std::shared_ptr<TruncateTrigger>& truncTrigger, const std::string& truncateIndexName, const std::shared_ptr<DocInfoAllocator>& docInfoAllocator, const file_system::IOConfig& ioConfig) { _evaluator = evaluator; _collector = collector; _truncateTrigger = truncTrigger; _docInfoAllocator = docInfoAllocator; _ioConfig = ioConfig; _truncateIndexName = truncateIndexName; _allocator = new util::MMapAllocator(); _byteSlicePool = new autil::mem_pool::Pool(_allocator, DEFAULT_CHUNK_SIZE * 1024 * 1024); // TODO, if in memory segment use truncate, align size should be 8 _bufferPool = new autil::mem_pool::RecyclePool(_allocator, DEFAULT_CHUNK_SIZE * 1024 * 1024); _postingWriterResource.reset(new PostingWriterResource(&_simplePool, _byteSlicePool, _bufferPool, _indexFormatOption.GetPostingFormatOption())); _truncateDictKeySet.clear(); _hasNullTerm = false; } void SingleTruncateIndexWriter::SetTruncateIndexMetaInfo(const std::shared_ptr<file_system::FileWriter>& metaFile, const std::string& firstDimenSortFieldName, bool desc) { if (!metaFile) { return; } _metaFileWriter = metaFile; _sortFieldRef = _docInfoAllocator->GetReference(firstDimenSortFieldName); assert(_sortFieldRef != NULL); _desc = desc; } Status SingleTruncateIndexWriter::AddPosting(const DictKeyInfo& dictKey, const std::shared_ptr<PostingIterator>& postingIt, df_t docFreq) { auto st = PrepareResource(); RETURN_IF_STATUS_ERROR(st, "prepare single truncate writer resource failed."); if (HasTruncated(dictKey)) { return Status::OK(); } _collector->CollectDocIds(dictKey, postingIt, docFreq); if (!_collector->Empty()) { RETURN_IF_STATUS_ERROR(WriteTruncateMeta(dictKey, postingIt), "writer truncate meta failed."); WriteTruncateIndex(dictKey, postingIt); } ResetResource(); return Status::OK(); } bool SingleTruncateIndexWriter::BuildTruncateIndex(const std::shared_ptr<PostingIterator>& postingIt, const std::shared_ptr<MultiSegmentPostingWriter>& postingWriter) { const std::shared_ptr<DocCollector::DocIdVector>& docIdVec = _collector->GetTruncateDocIds(); for (size_t i = 0; i < docIdVec->size(); ++i) { docid_t docId = (*docIdVec)[i]; [[maybe_unused]] docid_t seekedDocId = postingIt->SeekDoc(docId); assert(seekedDocId == docId); auto targetSegmentId = _docMapper->GetLocalId(docId); auto postingWriterImpl = postingWriter->GetSegmentPostingWriterBySegId(targetSegmentId); if (postingWriterImpl == nullptr) { continue; } std::shared_ptr<TruncatePostingIterator> truncateIter = std::dynamic_pointer_cast<TruncatePostingIterator>(postingIt); std::shared_ptr<MultiSegmentPostingIterator> multiIter; if (truncateIter != nullptr) { multiIter = std::dynamic_pointer_cast<MultiSegmentPostingIterator>(truncateIter->GetCurrentIterator()); } assert(multiIter); TermMatchData termMatchData; multiIter->Unpack(termMatchData); AddPosition(postingWriterImpl, termMatchData, multiIter); EndDocument(postingWriterImpl, multiIter, termMatchData, docId); } postingWriter->EndSegment(); return true; } void SingleTruncateIndexWriter::AddPosition(const std::shared_ptr<PostingWriter>& postingWriter, TermMatchData& termMatchData, const std::shared_ptr<PostingIterator>& postingIt) { InDocPositionState* inDocPosState = termMatchData.GetInDocPositionState(); if (inDocPosState == NULL) { return; } InDocPositionIterator* posIter = inDocPosState->CreateInDocIterator(); if (postingIt->HasPosition()) { pos_t pos = 0; while ((pos = posIter->SeekPosition(pos)) != INVALID_POSITION) { postingWriter->AddPosition(pos, posIter->GetPosPayload(), 0); } } else if (_invertedIndexConfig->GetOptionFlag() & of_position_list) { // add fake pos (0) for bitmap truncate postingWriter->AddPosition(0, posIter->GetPosPayload(), 0); } delete posIter; termMatchData.FreeInDocPositionState(); } void SingleTruncateIndexWriter::EndDocument(const std::shared_ptr<PostingWriter>& postingWriter, const std::shared_ptr<MultiSegmentPostingIterator>& postingIt, const TermMatchData& termMatchData, docid_t docId) { assert(_invertedIndexConfig); if (_invertedIndexConfig->GetOptionFlag() & of_term_payload) { postingWriter->SetTermPayload(postingIt->GetTermPayLoad()); } if (termMatchData.HasFieldMap()) { fieldmap_t fieldMap = termMatchData.GetFieldMap(); postingWriter->EndDocument(docId, postingIt->GetDocPayload(), fieldMap); } else { postingWriter->EndDocument(docId, postingIt->GetDocPayload()); } } void SingleTruncateIndexWriter::DumpPosting(const DictKeyInfo& dictKey, const std::shared_ptr<PostingIterator>& postingIt, const std::shared_ptr<MultiSegmentPostingWriter>& postingWriter) { auto truncateIter = std::dynamic_pointer_cast<TruncatePostingIterator>(postingIt); std::shared_ptr<MultiSegmentPostingIterator> multiIter; if (truncateIter) { multiIter = std::dynamic_pointer_cast<MultiSegmentPostingIterator>(truncateIter->GetCurrentIterator()); } assert(multiIter != nullptr); postingWriter->Dump(dictKey, _outputSegmentResources, multiIter->GetTermPayLoad()); } void SingleTruncateIndexWriter::SaveDictKey(const DictKeyInfo& dictKey) { if (dictKey.IsNull()) { _hasNullTerm = true; } else { _truncateDictKeySet.insert(dictKey.GetKey()); } } std::shared_ptr<MultiSegmentPostingWriter> SingleTruncateIndexWriter::CreatePostingWriter(InvertedIndexType indexType) { std::shared_ptr<MultiSegmentPostingWriter> writer; switch (indexType) { case it_primarykey64: case it_primarykey128: case it_trie: break; default: PostingFormatOption formatOption = _indexFormatOption.GetPostingFormatOption(); writer.reset(new MultiSegmentPostingWriter(_postingWriterResource.get(), _targetSegmentMetas, formatOption)); break; } return writer; } void SingleTruncateIndexWriter::EndPosting() { ReleaseMetaResource(); ReleaseTruncateIndexResource(); ReleasePoolResource(); } bool SingleTruncateIndexWriter::NeedTruncate(const TruncateTriggerInfo& info) const { return _truncateTrigger->NeedTruncate(info); } Status SingleTruncateIndexWriter::PrepareResource() { if (_outputSegmentResources.size() > 0) { return Status::OK(); } _outputSegmentResources.reserve(_targetSegmentMetas.size()); for (const auto& segmentMeta : _targetSegmentMetas) { auto indexDir = segmentMeta->segmentDir->GetDirectory(indexlib::index::INVERTED_INDEX_PATH, /*throwExceptionIfNotExist*/ false); indexDir->RemoveDirectory(_truncateIndexName, file_system::RemoveOption::MayNonExist()); const std::shared_ptr<file_system::Directory>& truncateIndexDir = indexDir->MakeDirectory(_truncateIndexName); auto [st, statistics] = segmentMeta->segmentInfo->GetSegmentStatistics(); auto segmentStatistics = std::make_shared<indexlibv2::framework::SegmentStatistics>(statistics); RETURN_IF_STATUS_ERROR(st, "get segment statistics failed."); auto indexOutputSegmentResource = std::make_shared<IndexOutputSegmentResource>(); indexOutputSegmentResource->Init(truncateIndexDir, _invertedIndexConfig, _ioConfig, segmentStatistics, &_simplePool, /*hasAdaptiveBitMap*/ false); _outputSegmentResources.push_back(indexOutputSegmentResource); std::string optionString = IndexFormatOption::ToString(_indexFormatOption); truncateIndexDir->Store(INDEX_FORMAT_OPTION_FILE_NAME, optionString); } return Status::OK(); } bool SingleTruncateIndexWriter::HasTruncated(const DictKeyInfo& dictKey) { if (dictKey.IsNull()) { return _hasNullTerm; } return _truncateDictKeySet.find(dictKey.GetKey()) != _truncateDictKeySet.end(); } void SingleTruncateIndexWriter::WriteTruncateIndex(const DictKeyInfo& dictKey, const std::shared_ptr<PostingIterator>& postingIt) { std::shared_ptr<MultiSegmentPostingWriter> postingWriter = CreatePostingWriter(_invertedIndexConfig->GetInvertedIndexType()); postingIt->Reset(); if (BuildTruncateIndex(postingIt, postingWriter)) { DumpPosting(dictKey, postingIt, postingWriter); SaveDictKey(dictKey); } AUTIL_LOG(DEBUG, "index writer allocator use bytes : %luM", _allocator->getUsedBytes() / (1024 * 1024)); postingWriter.reset(); } Status SingleTruncateIndexWriter::WriteTruncateMeta(const DictKeyInfo& dictKey, const std::shared_ptr<PostingIterator>& postingIt) { if (!_metaFileWriter) { return Status::OK(); } std::string metaValue; GenerateMetaValue(postingIt, dictKey, metaValue); if (!metaValue.empty()) { auto [st, _] = _metaFileWriter->Write(metaValue.c_str(), metaValue.size()).StatusWith(); RETURN_IF_STATUS_ERROR(st, "write truncate meta failed."); } return Status::OK(); } void SingleTruncateIndexWriter::GenerateMetaValue(const std::shared_ptr<PostingIterator>& postingIt, const DictKeyInfo& dictKey, std::string& metaValue) { std::string value; AcquireLastDocValue(postingIt, value); std::string key = dictKey.ToString(); metaValue = key + "\t" + value + "\n"; } void SingleTruncateIndexWriter::AcquireLastDocValue(const std::shared_ptr<PostingIterator>& postingIt, std::string& value) { docid_t docId = _collector->GetMinValueDocId(); DocInfo* docInfo = _docInfoAllocator->Allocate(); docInfo->SetDocId(docId); postingIt->Reset(); postingIt->SeekDoc(docId); _evaluator->Evaluate(docId, postingIt, docInfo); value = _sortFieldRef->GetStringValue(docInfo); int64_t int64Value = 0; if (!autil::StringUtil::fromString(value, int64Value)) { double doubleValue = 0; if (autil::StringUtil::fromString(value, doubleValue)) { if (_desc) { value = autil::StringUtil::toString(floor(doubleValue)); } else { value = autil::StringUtil::toString(ceil(doubleValue)); } } } } void SingleTruncateIndexWriter::ResetResource() { _bufferPool->reset(); _byteSlicePool->reset(); _collector->Reset(); } void SingleTruncateIndexWriter::ReleasePoolResource() { if (_byteSlicePool) { _byteSlicePool->release(); delete _byteSlicePool; _byteSlicePool = NULL; } if (_bufferPool) { _bufferPool->release(); delete _bufferPool; _bufferPool = NULL; } if (_allocator) { delete _allocator; _allocator = NULL; } } void SingleTruncateIndexWriter::ReleaseMetaResource() { if (_docInfoAllocator) { _docInfoAllocator->Release(); } if (_metaFileWriter) { _metaFileWriter->Close().GetOrThrow(); _metaFileWriter.reset(); } } void SingleTruncateIndexWriter::ReleaseTruncateIndexResource() { for (auto& indexOutputSegmentResource : _outputSegmentResources) { indexOutputSegmentResource->Reset(); } _outputSegmentResources.clear(); _truncateDictKeySet.clear(); _hasNullTerm = false; if (_collector) { _collector.reset(); AUTIL_LOG(DEBUG, "clear collector"); } } int64_t SingleTruncateIndexWriter::EstimateMemoryUse(int64_t maxPostingLen, uint32_t totalDocCount, size_t outputSegmentCount) const { int64_t size = sizeof(*this); size += maxPostingLen; // for build truncate index int64_t writeBufferSize = _ioConfig.GetWriteBufferSize() * 2 * outputSegmentCount; // dict and data AUTIL_LOG(INFO, "SingleTruncateIndexWriter: write buffer [%ld MB]", writeBufferSize / 1024 / 1024); size += writeBufferSize; int64_t collectorMemUse = _collector->EstimateMemoryUse(totalDocCount); AUTIL_LOG(INFO, "SingleTruncateIndexWriter: collectorMemUse [%ld MB]", collectorMemUse / 1024 / 1024); size += collectorMemUse; return size; } } // namespace indexlib::index