aios/storage/indexlib/index/common/data_structure/VarLenDataReader.h (336 lines of code) (raw):
/*
* Copyright 2014-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "autil/Log.h"
#include "indexlib/file_system//file/CompressFileReader.h"
#include "indexlib/file_system//file/FileReader.h"
#include "indexlib/file_system/Directory.h"
#include "indexlib/file_system/stream/FileStream.h"
#include "indexlib/file_system/stream/FileStreamCreator.h"
#include "indexlib/index/common/data_structure/VarLenOffsetReader.h"
#include "indexlib/index/common/field_format/attribute/MultiValueAttributeFormatter.h"
namespace indexlibv2::index {
class VarLenDataReader
{
public:
VarLenDataReader(const VarLenDataParam& param, bool isOnline);
~VarLenDataReader();
public:
Status Init(uint32_t docCount, const std::shared_ptr<indexlib::file_system::IDirectory>& directory,
const std::string& offsetFileName, const std::string& dataFileName);
inline std::pair<Status, uint64_t> GetOffset(docid_t docId) const
{
if (_isOnline) {
return _offsetReader.GetOffset(docId);
}
return _offlineOffsetReader.GetOffset(docId);
}
inline uint32_t GetDocCount() const { return _offsetReader.GetDocCount(); }
inline std::pair<Status, bool> GetValue(docid_t docId, autil::StringView& value,
autil::mem_pool::PoolBase* pool) const __ALWAYS_INLINE;
future_lite::coro::Lazy<indexlib::index::ErrorCodeVec>
GetValue(const std::vector<docid_t>& docIds, autil::mem_pool::PoolBase* pool,
indexlib::file_system::ReadOption readOption, std::vector<autil::StringView>* data) const noexcept;
const std::shared_ptr<indexlib::file_system::FileReader>& GetOffsetFileReader() const
{
return _offsetReader.GetFileReader();
}
const std::shared_ptr<indexlib::file_system::FileReader>& GetDataFileReader() const { return _dataFileReader; }
size_t EvaluateCurrentMemUsed();
private:
inline std::pair<Status, bool> GetOffsetAndLength(indexlib::file_system::FileReader* fileReader, docid_t docId,
uint64_t& offset, uint32_t& length) const __ALWAYS_INLINE;
inline std::pair<Status, bool> GetValue(indexlib::file_system::FileReader* fileReader, docid_t docId,
autil::StringView& value,
autil::mem_pool::PoolBase* pool) const __ALWAYS_INLINE;
future_lite::coro::Lazy<indexlib::index::ErrorCodeVec>
GetOffsetAndLength(const std::shared_ptr<indexlib::file_system::FileStream>& fileStream,
const std::vector<docid_t>& docIds, autil::mem_pool::PoolBase* sessionPool,
indexlib::file_system::ReadOption readOption, std::vector<uint64_t>* offsets,
std::vector<uint32_t>* lens) const noexcept;
private:
autil::mem_pool::Pool _offlinePool;
VarLenOffsetReader _offsetReader;
VarLenOffsetReader _offlineOffsetReader;
std::shared_ptr<indexlib::file_system::FileReader> _dataFileReader;
char* _dataBaseAddr;
size_t _dataLength;
VarLenDataParam _param;
bool _isOnline;
bool _dataFileCompress;
private:
AUTIL_LOG_DECLARE();
};
///////////////////////////////////////////////////////////////////////////////////////
inline std::pair<Status, bool> VarLenDataReader::GetValue(docid_t docId, autil::StringView& value,
autil::mem_pool::PoolBase* pool) const
{
if (!_dataFileCompress) {
indexlib::file_system::FileReader* dataReader = _dataFileReader.get();
return GetValue(dataReader, docId, value, pool);
}
autil::mem_pool::Pool* sessionPool = dynamic_cast<autil::mem_pool::Pool*>(pool);
if (!sessionPool) {
AUTIL_LOG(ERROR, "read value fail, pool should not be null for compressed data.");
return std::make_pair(Status::OK(), false);
}
auto compReader = std::static_pointer_cast<indexlib::file_system::CompressFileReader>(_dataFileReader);
if (!_isOnline) {
return GetValue(compReader.get(), docId, value, pool);
}
indexlib::file_system::CompressFileReader* dataReader = compReader->CreateSessionReader(sessionPool);
indexlib::file_system::CompressFileReaderGuard readerCuard(dataReader, sessionPool);
return GetValue(dataReader, docId, value, pool);
}
inline std::pair<Status, bool> VarLenDataReader::GetValue(indexlib::file_system::FileReader* dataReader, docid_t docId,
autil::StringView& value,
autil::mem_pool::PoolBase* pool) const
{
uint64_t offset = 0;
uint32_t len = 0;
auto [status, ret] = GetOffsetAndLength(dataReader, docId, offset, len);
RETURN2_IF_STATUS_ERROR(status, false, "get data offset and length fail");
if (!ret) {
return std::make_pair(Status::OK(), false);
}
if (_dataBaseAddr) {
assert(!_dataFileCompress);
value = autil::StringView(_dataBaseAddr + offset, len);
return std::make_pair(Status::OK(), true);
}
if (!pool) {
AUTIL_LOG(ERROR, "read value fail, pool should not be null.");
return std::make_pair(Status::OK(), false);
}
char* buffer = (char*)pool->allocate(len);
auto [readStatus, retLen] = dataReader->Read(buffer, len, offset).StatusWith();
RETURN2_IF_STATUS_ERROR(readStatus, false, "read data form file fail");
if (retLen != (size_t)len) {
AUTIL_LOG(ERROR, "read value fail from file [%s], offset [%lu], len [%u]", dataReader->DebugString().c_str(),
offset, len);
return std::make_pair(Status::OK(), false);
}
value = autil::StringView(buffer, len);
return std::make_pair(Status::OK(), true);
}
inline std::pair<Status, bool> VarLenDataReader::GetOffsetAndLength(indexlib::file_system::FileReader* dataReader,
docid_t docId, uint64_t& offset,
uint32_t& length) const
{
if (docId >= GetDocCount()) {
AUTIL_LOG(ERROR, "invalid docId [%d], over docCount [%u]", docId, GetDocCount());
return std::make_pair(Status::OK(), false);
}
if (_param.dataItemUniqEncode && !_param.appendDataItemLength) {
AUTIL_LOG(ERROR, "var_len data structure not support enable dataItemUniqEncode, "
"but not set appendDataItemLength");
return std::make_pair(Status::OK(), false);
}
auto [status, curOffset] = GetOffset(docId);
RETURN2_IF_STATUS_ERROR(status, false, "get offset for doc [%d] fail", docId);
offset = curOffset;
if (!_param.appendDataItemLength) {
assert(!_param.dataItemUniqEncode);
uint64_t nextOffset = 0;
if (unlikely(_param.disableGuardOffset && (docId + 1) == (docid_t)GetDocCount())) {
nextOffset = _dataLength;
} else {
std::tie(status, curOffset) = GetOffset(docId + 1);
RETURN2_IF_STATUS_ERROR(status, false, "get next offset fail for doc [%d]", docId);
nextOffset = curOffset;
}
length = nextOffset - offset;
} else {
size_t encodeCountLen = 0;
bool isNull = false;
if (_dataBaseAddr) {
char* buffPtr = _dataBaseAddr + offset;
length = MultiValueAttributeFormatter::DecodeCount((const char*)buffPtr, encodeCountLen, isNull);
} else {
char buffPtr[5];
auto [readStatus, retLen] = dataReader->Read(buffPtr, sizeof(uint8_t), offset).StatusWith();
RETURN2_IF_STATUS_ERROR(readStatus, false, "read data from file fail");
if (retLen != sizeof(uint8_t)) {
return std::make_pair(Status::OK(), false);
}
encodeCountLen = MultiValueAttributeFormatter::GetEncodedCountFromFirstByte(*buffPtr);
// read remain bytes for count
std::tie(readStatus, retLen) =
dataReader->Read(buffPtr + sizeof(uint8_t), encodeCountLen - 1, offset + sizeof(uint8_t)).StatusWith();
RETURN2_IF_STATUS_ERROR(readStatus, false, "read data from file fail");
if (retLen != encodeCountLen - 1) {
return std::make_pair(Status::OK(), false);
}
length = MultiValueAttributeFormatter::DecodeCount((const char*)buffPtr, encodeCountLen, isNull);
}
offset += encodeCountLen;
}
return std::make_pair(Status::OK(), true);
}
inline future_lite::coro::Lazy<indexlib::index::ErrorCodeVec>
VarLenDataReader::GetOffsetAndLength(const std::shared_ptr<indexlib::file_system::FileStream>& fileStream,
const std::vector<docid_t>& docIds, autil::mem_pool::PoolBase* sessionPool,
indexlib::file_system::ReadOption readOption, std::vector<uint64_t>* offsets,
std::vector<uint32_t>* lens) const noexcept
{
if (docIds.empty()) {
co_return indexlib::index::ErrorCodeVec();
}
lens->resize(docIds.size());
indexlib::index::ErrorCodeVec result(docIds.size(), indexlib::index::ErrorCode::OK);
uint32_t docCount = GetDocCount();
for (size_t i = 0; i < docIds.size(); ++i) {
if (docIds[i] >= docCount) {
AUTIL_LOG(ERROR, "invalid docId [%d], over docCount [%u]", docIds[i], docCount);
co_return indexlib::index::ErrorCodeVec(docIds.size(), indexlib::index::ErrorCode::Runtime);
}
}
if (_param.dataItemUniqEncode && !_param.appendDataItemLength) {
AUTIL_LOG(ERROR, "var_len data structure not support enable dataItemUniqEncode, "
"but not set appendDataItemLength");
co_return indexlib::index::ErrorCodeVec(docIds.size(), indexlib::index::ErrorCode::Runtime);
}
auto sessionOffsetReader = _offsetReader.CreateSessionReader(dynamic_cast<autil::mem_pool::Pool*>(sessionPool));
if (!_param.appendDataItemLength) {
assert(!_param.dataItemUniqEncode);
std::vector<docid_t> targetDocIds;
for (size_t i = 0; i < docIds.size(); ++i) {
targetDocIds.push_back(docIds[i]);
targetDocIds.push_back(docIds[i] + 1);
}
if (_param.disableGuardOffset && (docIds[docIds.size() - 1] + 1 == (docid_t)GetDocCount())) {
targetDocIds.pop_back();
}
std::vector<uint64_t> targetOffsets;
auto offsetResult = co_await sessionOffsetReader.GetOffset(targetDocIds, readOption, &targetOffsets);
if (_param.disableGuardOffset && (docIds[docIds.size() - 1] + 1 == (docid_t)GetDocCount())) {
// append one offset to make 2n offset, make code easy
targetDocIds.push_back(docIds[docIds.size() - 1] + 1);
targetOffsets.push_back(_dataLength);
offsetResult.push_back(indexlib::index::ErrorCode::OK);
}
for (size_t i = 0; i < docIds.size(); ++i) {
offsets->push_back(targetOffsets[2 * i]);
(*lens)[i] = (targetOffsets[2 * i + 1] - targetOffsets[2 * i]);
if (indexlib::index::ErrorCode::OK != offsetResult[2 * i] ||
indexlib::index::ErrorCode::OK != offsetResult[2 * i + 1]) {
result[i] = indexlib::index::ErrorCode::Runtime;
}
}
co_return result;
} else {
auto offsetResult = co_await sessionOffsetReader.GetOffset(docIds, readOption, offsets);
bool isNull;
if (_dataBaseAddr) {
for (size_t i = 0; i < docIds.size(); ++i) {
if (offsetResult[i] == indexlib::index::ErrorCode::OK) {
size_t encodeCountLen = 0;
char* buffPtr = _dataBaseAddr + (*offsets)[i];
(*lens)[i] =
MultiValueAttributeFormatter::DecodeCount((const char*)buffPtr, encodeCountLen, isNull);
(*offsets)[i] += encodeCountLen;
} else {
result[i] = offsetResult[i];
}
}
} else {
char* batchBufferPtr = IE_POOL_COMPATIBLE_NEW_VECTOR(sessionPool, char, 5 * docIds.size());
indexlib::file_system::BatchIO batchIO(docIds.size());
size_t bufferIdx = 0;
size_t streamLen = fileStream->GetStreamLength();
for (size_t i = 0; i < docIds.size(); ++i) {
assert(streamLen >= (*offsets)[i]);
size_t len = std::min(size_t(5), streamLen - (*offsets)[i]);
if (offsetResult[i] == indexlib::index::ErrorCode::OK) {
batchIO[bufferIdx] =
indexlib::file_system::SingleIO(batchBufferPtr + bufferIdx * 5, len, (*offsets)[i]);
bufferIdx++;
}
}
batchIO.resize(bufferIdx);
auto ioResult = co_await fileStream->BatchRead(batchIO, readOption);
assert(ioResult.size() == batchIO.size());
bufferIdx = 0;
for (size_t i = 0; i < docIds.size(); ++i) {
if (indexlib::index::ErrorCode::OK != offsetResult[i]) {
continue;
}
if (!ioResult[bufferIdx].OK()) {
result[i] = indexlib::index::ConvertFSErrorCode(ioResult[bufferIdx].ec);
bufferIdx++;
continue;
}
assert(bufferIdx < batchIO.size());
char* bufferPtr = (char*)batchIO[bufferIdx].buffer;
size_t encodeCountLen = MultiValueAttributeFormatter::GetEncodedCountFromFirstByte(*bufferPtr);
// read remain bytes for count
assert(bufferPtr);
(*lens)[i] = MultiValueAttributeFormatter::DecodeCount((const char*)bufferPtr, encodeCountLen, isNull);
(*offsets)[i] += encodeCountLen;
bufferIdx++;
}
IE_POOL_COMPATIBLE_DELETE_VECTOR(sessionPool, batchBufferPtr, 5 * docIds.size());
}
co_return result;
}
assert(false);
co_return indexlib::index::ErrorCodeVec();
}
inline future_lite::coro::Lazy<indexlib::index::ErrorCodeVec>
VarLenDataReader::GetValue(const std::vector<docid_t>& docIds, autil::mem_pool::PoolBase* pool,
indexlib::file_system::ReadOption readOption,
std::vector<autil::StringView>* data) const noexcept
{
if (!std::is_sorted(docIds.begin(), docIds.end())) {
AUTIL_LOG(ERROR, "read value fail, docids is not increasing.");
co_return indexlib::index::ErrorCodeVec(docIds.size(), indexlib::index::ErrorCode::Runtime);
}
assert(_isOnline);
if (!pool) {
AUTIL_LOG(ERROR, "read value fail, pool is required.");
co_return indexlib::index::ErrorCodeVec(docIds.size(), indexlib::index::ErrorCode::Runtime);
}
indexlib::index::ErrorCodeVec ret(docIds.size(), indexlib::index::ErrorCode::OK);
std::vector<uint64_t> offsets;
std::vector<uint32_t> lens;
std::shared_ptr<indexlib::file_system::FileStream> fileStream;
if (!_dataBaseAddr) {
fileStream = indexlib::file_system::FileStreamCreator::CreateFileStream(
_dataFileReader, dynamic_cast<autil::mem_pool::Pool*>(pool));
}
auto offsetResult = co_await GetOffsetAndLength(fileStream, docIds, pool, readOption, &offsets, &lens);
if (_dataBaseAddr) {
assert(!_dataFileCompress);
for (size_t i = 0; i < offsetResult.size(); ++i) {
if (offsetResult[i] == indexlib::index::ErrorCode::OK) {
data->push_back(autil::StringView(_dataBaseAddr + offsets[i], lens[i]));
} else {
data->push_back(autil::StringView());
ret[i] = offsetResult[i];
}
}
co_return ret;
}
if (!pool) {
AUTIL_LOG(ERROR, "read value fail, pool should not be null.");
co_return indexlib::index::ErrorCodeVec(docIds.size(), indexlib::index::ErrorCode::Runtime);
}
indexlib::file_system::BatchIO batchIO;
for (size_t i = 0; i < docIds.size(); ++i) {
if (indexlib::index::ErrorCode::OK == offsetResult[i]) {
batchIO.emplace_back(pool->allocate(lens[i]), lens[i], offsets[i]);
}
}
auto dataReadResult = co_await fileStream->BatchRead(batchIO, readOption);
size_t resultIdx = 0;
for (size_t i = 0; i < docIds.size(); ++i) {
if (offsetResult[i] == indexlib::index::ErrorCode::OK) {
if (dataReadResult[resultIdx].OK()) {
data->push_back(autil::StringView((char*)batchIO[resultIdx].buffer, batchIO[resultIdx].len));
} else {
ret[i] = indexlib::index::ConvertFSErrorCode(dataReadResult[resultIdx].ec);
data->push_back(autil::StringView());
}
resultIdx++;
} else {
ret[i] = offsetResult[i];
data->push_back(autil::StringView());
}
}
co_return ret;
}
} // namespace indexlibv2::index