aios/storage/indexlib/framework/Tablet.cpp (1,856 lines of code) (raw):

/* * Copyright 2014-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "indexlib/framework/Tablet.h" #include <algorithm> #include <assert.h> #include <chrono> #include <exception> #include <limits> #include <ostream> #include <stddef.h> #include <type_traits> #include "autil/CommonMacros.h" #include "autil/EnvUtil.h" #include "autil/Scope.h" #include "autil/StringUtil.h" #include "autil/TimeUtility.h" #include "autil/UnitUtil.h" #include "autil/WorkItemQueue.h" #include "autil/legacy/exception.h" #include "autil/legacy/legacy_jsonizable.h" #include "autil/legacy/legacy_jsonizable_dec.h" #include "future_lite/TaskScheduler.h" #include "future_lite/Try.h" #include "future_lite/coro/Lazy.h" #include "future_lite/coro/LazyHelper.h" #include "indexlib/base/Constant.h" #include "indexlib/base/MemoryQuotaController.h" #include "indexlib/base/MemoryQuotaSynchronizer.h" #include "indexlib/base/PathUtil.h" #include "indexlib/config/BackgroundTaskConfig.h" #include "indexlib/config/BuildConfig.h" #include "indexlib/config/FieldConfig.h" #include "indexlib/config/IIndexConfig.h" #include "indexlib/config/ITabletSchema.h" #include "indexlib/config/IndexTaskConfig.h" #include "indexlib/config/OnlineConfig.h" #include "indexlib/config/TabletOptions.h" #include "indexlib/document/IDocumentBatch.h" #include "indexlib/document/IDocumentParser.h" #include "indexlib/file_system/Directory.h" #include "indexlib/file_system/ErrorCode.h" #include "indexlib/file_system/FSResult.h" #include "indexlib/file_system/FileBlockCacheContainer.h" #include "indexlib/file_system/FileSystemCreator.h" #include "indexlib/file_system/FileSystemDefine.h" #include "indexlib/file_system/FileSystemMetricsReporter.h" #include "indexlib/file_system/FileSystemOptions.h" #include "indexlib/file_system/IFileSystem.h" #include "indexlib/file_system/LifecycleConfig.h" #include "indexlib/file_system/LifecycleTable.h" #include "indexlib/file_system/MountOption.h" #include "indexlib/file_system/ReaderOption.h" #include "indexlib/file_system/fslib/FenceContext.h" #include "indexlib/file_system/fslib/FslibWrapper.h" #include "indexlib/file_system/load_config/LoadConfigList.h" #include "indexlib/framework/BuildResource.h" #include "indexlib/framework/DefaultMemoryControlStrategy.h" #include "indexlib/framework/DeployIndexUtil.h" #include "indexlib/framework/DiskSegment.h" #include "indexlib/framework/Fence.h" #include "indexlib/framework/IMemoryControlStrategy.h" #include "indexlib/framework/IMetrics.h" #include "indexlib/framework/ITabletImporter.h" #include "indexlib/framework/ITabletLoader.h" #include "indexlib/framework/IndexRecoverStrategy.h" #include "indexlib/framework/MemSegmentCreator.h" #include "indexlib/framework/MetricsManager.h" #include "indexlib/framework/ResourceMap.h" #include "indexlib/framework/Segment.h" #include "indexlib/framework/SegmentDumper.h" #include "indexlib/framework/SegmentInfo.h" #include "indexlib/framework/SegmentMeta.h" #include "indexlib/framework/TabletCommitter.h" #include "indexlib/framework/TabletDataSchemaGroup.h" #include "indexlib/framework/TabletDumper.h" #include "indexlib/framework/TabletFactoryCreator.h" #include "indexlib/framework/TabletId.h" #include "indexlib/framework/TabletInfos.h" #include "indexlib/framework/TabletReader.h" #include "indexlib/framework/TabletReaderContainer.h" #include "indexlib/framework/TabletSchemaLoader.h" #include "indexlib/framework/TabletSchemaManager.h" #include "indexlib/framework/TaskType.h" #include "indexlib/framework/VersionDeployDescription.h" #include "indexlib/framework/VersionLine.h" #include "indexlib/framework/VersionLoader.h" #include "indexlib/framework/VersionMetaCreator.h" #include "indexlib/framework/cleaner/DropIndexCleaner.h" #include "indexlib/framework/cleaner/OnDiskIndexCleaner.h" #include "indexlib/framework/cleaner/ResourceCleaner.h" #include "indexlib/framework/index_task/Constant.h" #include "indexlib/framework/lifecycle/LifecycleTableCreator.h" #include "indexlib/framework/mem_reclaimer/EpochBasedMemReclaimer.h" #include "indexlib/framework/mem_reclaimer/MemReclaimerMetrics.h" #include "indexlib/util/Exception.h" #include "indexlib/util/KeyValueMap.h" #include "indexlib/util/PathUtil.h" #include "indexlib/util/cache/SearchCachePartitionWrapper.h" #include "indexlib/util/memory_control/BuildResourceMetrics.h" #include "indexlib/util/memory_control/MemoryReserver.h" #include "indexlib/util/metrics/Metric.h" #include "indexlib/util/metrics/MetricProvider.h" namespace indexlibv2::framework { AUTIL_LOG_SETUP(indexlib.framework, Tablet); #define TABLET_LOG(level, format, args...) \ AUTIL_LOG(level, "[%s] [%p] " format, _tabletInfos->GetTabletName().c_str(), this, ##args) #define TABLET_INTERVAL_LOG2(logInterval, level, format, args...) \ AUTIL_INTERVAL_LOG2(logInterval, level, "[%s] [%p] " format, _tabletInfos->GetTabletName().c_str(), this, ##args) Tablet::Tablet(const TabletResource& resource) : _tabletInfos(std::make_unique<TabletInfos>()) , _needSeek(false) , _tabletCommitter(std::make_unique<TabletCommitter>(resource.tabletId.GenerateTabletName())) , _tabletDumper(std::make_unique<TabletDumper>(resource.tabletId.GenerateTabletName(), resource.dumpExecutor, _tabletCommitter.get())) , _mergeController(resource.mergeController) , _idGenerator(resource.idGenerator) , _isClosed(false) { _tabletInfos->SetTabletId(resource.tabletId); if (resource.memoryQuotaController) { _tabletMemoryQuotaController = std::make_shared<MemoryQuotaController>(resource.tabletId.GenerateTabletName(), resource.memoryQuotaController); } else { _tabletMemoryQuotaController = std::make_shared<MemoryQuotaController>( resource.tabletId.GenerateTabletName(), /*totalQuota=*/std::numeric_limits<int64_t>::max()); } if (resource.buildMemoryQuotaController) { auto buildMemoryQuotaController = std::make_shared<MemoryQuotaController>( resource.tabletId.GenerateTabletName() + "_build", resource.buildMemoryQuotaController); _buildMemoryQuotaSynchronizer = std::make_unique<MemoryQuotaSynchronizer>(buildMemoryQuotaController); } if (resource.taskScheduler) { _taskScheduler = std::make_unique<future_lite::NamedTaskScheduler>(resource.taskScheduler); } if (resource.fileBlockCacheContainer) { _fileBlockCacheContainer = resource.fileBlockCacheContainer; } else { _fileBlockCacheContainer = std::make_shared<indexlib::file_system::FileBlockCacheContainer>(); _fileBlockCacheContainer->Init(/*configStr=*/"", /*globalMemoryQuotaController=*/nullptr, /*taskScheduler=*/nullptr, /*metricProvider=*/nullptr); } if (resource.searchCache) { _searchCache = std::make_shared<indexlib::util::SearchCachePartitionWrapper>( resource.searchCache, resource.tabletId.GenerateTabletName()); } _metricsManager.reset(new MetricsManager(resource.tabletId.GenerateTabletName(), resource.metricsReporter)); // now we only support epoch base mem reclaimer if (_enableMemReclaimer) { auto memReclaimerMetrics = std::dynamic_pointer_cast<MemReclaimerMetrics>( _metricsManager->CreateMetrics("MEM_RECLAIMER", [&]() -> std::shared_ptr<framework::IMetrics> { return std::make_shared<MemReclaimerMetrics>(_metricsManager->GetMetricsReporter()); })); assert(nullptr != memReclaimerMetrics); _memReclaimer.reset(new EpochBasedMemReclaimer(memReclaimerMetrics)); } if (resource.consistentModeBuildThreadPool != nullptr) { _consistentModeBuildThreadPool = resource.consistentModeBuildThreadPool; } if (resource.inconsistentModeBuildThreadPool != nullptr) { _inconsistentModeBuildThreadPool = resource.inconsistentModeBuildThreadPool; } auto range = _tabletInfos->GetTabletId().GetRange(); TABLET_LOG(INFO, "create tablet end, ipPort[%s:%u], range[%u_%u]", _tabletInfos->GetTabletId().GetIp().c_str(), _tabletInfos->GetTabletId().GetPort(), range.first, range.second); } Tablet::~Tablet() { Close(); } std::shared_ptr<ITabletReader> Tablet::GetTabletReader() const { std::lock_guard<std::mutex> guard(_readerMutex); return _tabletFactory->CreateTabletSessionReader(_tabletReader, _memReclaimer); } const TabletInfos* Tablet::GetTabletInfos() const { return _tabletInfos.get(); } std::shared_ptr<config::ITabletSchema> Tablet::GetTabletSchema() const { auto tabletData = GetTabletData(); if (tabletData == nullptr) { return nullptr; } return tabletData->GetWriteSchema(); } std::shared_ptr<config::TabletOptions> Tablet::GetTabletOptions() const { return _tabletOptions; } Status Tablet::OpenWriterAndReader(std::shared_ptr<TabletData> tabletData, const framework::OpenOptions& openOptions, const std::shared_ptr<indexlibv2::framework::VersionDeployDescription>& versionDpDesc) { // create and set reader ReadResource readResource = GenerateReadResource(); auto readSchema = GetReadSchema(tabletData); auto tabletReader = _tabletFactory->CreateTabletReader(readSchema); if (_tabletOptions->IsOnline()) { auto versionId = tabletData->GetOnDiskVersion().GetVersionId(); TABLET_LOG(INFO, "begin open tablet reader, version id[%d]", versionId); auto st = tabletReader->Open(tabletData, readResource); if (!st.IsOK()) { TABLET_LOG(ERROR, "tablet reader open failed"); return st; } } else { TABLET_LOG(INFO, "skip open tablet reader for offline"); } { std::lock_guard<std::mutex> guard(_readerMutex); _tabletReader = std::move(tabletReader); _tabletReaderContainer->AddTabletReader(tabletData, _tabletReader, versionDpDesc); } TABLET_LOG(INFO, "end open tablet reader"); // create and set writer if (unlikely(_tabletOptions->IsReadOnly())) { TABLET_LOG(INFO, "table is readonly not open writer"); } else { TABLET_LOG(INFO, "begin open tablet writer, version id[%d]", tabletData->GetOnDiskVersion().GetVersionId()); decltype(_tabletWriter) tabletWriter; auto writeSchema = tabletData->GetWriteSchema(); assert(writeSchema); auto slice = tabletData->CreateSlice(Segment::SegmentStatus::ST_BUILDING); if (!slice.empty()) { BuildResource buildResource = GenerateBuildResource(COUNTER_PREFIX); tabletWriter = _tabletFactory->CreateTabletWriter(writeSchema); Status st = tabletWriter->Open(tabletData, buildResource, openOptions); if (!st.IsOK()) { TABLET_LOG(ERROR, "tablet writer open failed"); return st; } } else { auto currentLocator = GetTabletData()->GetLocator(); assert(!_tabletOptions->IsOnline() || currentLocator == GetTabletInfos()->GetLatestLocator()); if (_tabletOptions->IsOnline() && currentLocator.IsValid() && currentLocator != tabletData->GetLocator()) { TABLET_LOG(WARN, "drop realtime, request reseek upstream, current locator[%s], new locator[%s]", currentLocator.DebugString().c_str(), tabletData->GetLocator().DebugString().c_str()); _tabletInfos->SetBuildLocator(Locator()); _needSeek.store(true); } } CloseWriterUnsafe(); _tabletWriter = std::move(tabletWriter); TABLET_LOG(INFO, "end open tablet writer"); } SetTabletData(std::move(tabletData)); _tabletInfos->SetLastReopenLocator(GetTabletData()->GetLocator()); UpdateDocCount(); ReportMetrics(_tabletWriter); return Status::OK(); } Status Tablet::ReopenNewSegment(const std::shared_ptr<config::ITabletSchema>& schema) { return RefreshTabletData(RefreshStrategy::NEW_BUILDING_SEGMENT, schema); } Status Tablet::RefreshTabletData(RefreshStrategy strategy, const std::shared_ptr<config::ITabletSchema>& writeSchema) { // assert dataMutex is held indexlib::util::ScopeLatencyReporter scopeLatency(_tabletMetrics->GetreopenRealtimeLatencyMetric().get()); assert(GetTabletData() != nullptr); auto tabletData = GetTabletData(); auto slice = tabletData->CreateSlice(); std::vector<std::shared_ptr<Segment>> segments; segments.insert(segments.end(), slice.begin(), slice.end()); if (strategy == RefreshStrategy::REPLACE_BUILDING_SEGMENT) { if (!slice.empty() && (*slice.rbegin())->GetSegmentStatus() == Segment::SegmentStatus::ST_BUILDING) { segments.pop_back(); } } if (strategy == RefreshStrategy::NEW_BUILDING_SEGMENT || strategy == RefreshStrategy::REPLACE_BUILDING_SEGMENT) { BuildResource buildResource = GenerateBuildResource(COUNTER_PREFIX); auto [status, memSegment] = _memSegmentCreator->CreateMemSegment(buildResource, GetTabletData(), writeSchema); if (!status.IsOK()) { TABLET_LOG(ERROR, "create mem segment failed, memSegment is null"); return status; } assert(memSegment != nullptr); segments.push_back(memSegment); } auto newTabletData = std::make_shared<TabletData>(_tabletInfos->GetTabletName()); assert(tabletData->GetResourceMap()); auto st = newTabletData->Init(tabletData->GetOnDiskVersion().Clone(), std::move(segments), tabletData->GetResourceMap()->Clone()); if (!st.IsOK()) { TABLET_LOG(ERROR, "init tablet data failed: %s", st.ToString().c_str()); return st; } st = FinalizeTabletData(newTabletData.get(), writeSchema); if (!st.IsOK()) { TABLET_LOG(ERROR, "finalize tablet data failed: %s", st.ToString().c_str()); return st; } st = OpenWriterAndReader(std::move(newTabletData), _openOptions, _tabletInfos->GetLoadedVersionDeployDescription()); return st; } Status Tablet::CheckDoc(document::IDocumentBatch* batch) { // check if source sealed if (_sealedSourceLocator && batch->GetLastLocator().IsSameSrc(_sealedSourceLocator.value(), false)) { TABLET_LOG(WARN, "source locator [%s] sealed", _sealedSourceLocator.value().DebugString().c_str()); return Status::Sealed("sealed"); } if (_sealedSourceLocator) { _tabletCommitter->SetSealed(false); _sealedSourceLocator.reset(); } return Status::OK(); } Status Tablet::Build(const std::shared_ptr<document::IDocumentBatch>& batch) { if (unlikely(_tabletOptions->IsReadOnly())) { return Status::Unimplement("readonly tablet not support build"); } if (unlikely(batch == nullptr)) { return Status::InvalidArgs("document batch is nullptr"); } auto memStatus = GetTabletInfos()->GetMemoryStatus(); if (memStatus == indexlibv2::framework::MemoryStatus::REACH_TOTAL_MEM_LIMIT || memStatus == indexlibv2::framework::MemoryStatus::REACH_MAX_RT_INDEX_SIZE) { return Status::NoMem("memory status: %d", int(memStatus)); } std::lock_guard<std::mutex> guard(_dataMutex); auto status = CheckDoc(batch.get()); if (!status.IsOK()) { return status; } if (_needSeek.load()) { _needSeek.store(false); return Status::Uninitialize("need re seek source"); } // check locator forward, ignore locator backward auto latestLocator = _tabletInfos->GetLatestLocator(); const Locator& docLocator = batch->GetLastLocator(); if ((latestLocator.IsValid() && docLocator.IsValid()) /*locator need valid*/ && (!latestLocator.IsLegacyLocator() && !docLocator.IsLegacyLocator()) /*locator can not be legacy locator*/ && latestLocator.IsSameSrc(docLocator, false)) { /*need be same src*/ auto [from, to] = docLocator.GetLocatorRange(); if (!latestLocator.ShrinkToRange(from, to) || docLocator.IsFasterThan(latestLocator, true) != Locator::LocatorCompareResult::LCR_FULLY_FASTER) { _tabletMetrics->AddTabletFault("locator rollback"); if (_tabletOptions->GetOnlineConfig().GetAllowLocatorRollback()) { TABLET_INTERVAL_LOG2( 120, WARN, "doc locator [%s] , tablet lastest locator [%s], shrink or compare failed, but not skip it", batch->GetLastLocator().DebugString().c_str(), _tabletInfos->GetLatestLocator().DebugString().c_str()); } else { TABLET_INTERVAL_LOG2( 120, ERROR, "skip doc, doc locator [%s] , tablet lastest locator [%s], shrink or compare failed", batch->GetLastLocator().DebugString().c_str(), _tabletInfos->GetLatestLocator().DebugString().c_str()); return Status::OK(); } } } if (_tabletWriter == nullptr) { Status st = ReopenNewSegment(GetTabletSchema()); if (!st.IsOK()) { CloseWriterUnsafe(); TABLET_LOG(ERROR, "reopen new segment failed, error: %s", st.ToString().c_str()); return st; } } status = _tabletWriter->Build(batch); if (status.IsNeedDump()) { auto segmentDumper = _tabletWriter->CreateSegmentDumper(); if (segmentDumper == nullptr) { return Status::Corruption("create segment dumper failed"); } _tabletDumper->PushSegmentDumper(std::move(segmentDumper)); Status st = ReopenNewSegment(GetTabletSchema()); if (!st.IsOK()) { CloseWriterUnsafe(); TABLET_LOG(ERROR, "reopen new segment failed"); return st; } status = _tabletWriter->Build(batch); } else if (status.IsNoMem()) { // TODO(tianwei) Add background task to clean resource. } if (status.IsOK()) { auto memSegLocator = GetMemSegmentLocator(); if (memSegLocator.IsValid()) { _tabletInfos->SetBuildLocator(memSegLocator); } UpdateDocCount(); } return status; } Locator Tablet::GetMemSegmentLocator() { auto slice = GetTabletData()->CreateSlice(Segment::SegmentStatus::ST_BUILDING); if (!slice.empty()) { return (*slice.begin())->GetLocator(); } return Locator(); } std::shared_ptr<indexlib::file_system::Directory> Tablet::GetRootDirectory() const { if (!_fence.GetFileSystem()) { return nullptr; } return indexlib::file_system::Directory::Get(_fence.GetFileSystem()); } bool Tablet::NeedCommit() const { return _tabletCommitter->NeedCommit(); } std::pair<Status, VersionMeta> Tablet::Commit(const CommitOptions& commitOptions) { auto currentTabletData = GetTabletData(); TABLET_LOG(INFO, "begin commit version, target version id [%d]", commitOptions.GetTargetVersionId()); // do not clean in-memory index while commit // otherwise mounted merged version will be erased RETURN2_IF_STATUS_ERROR(RenewFenceLease(/*createIfNotExist=*/true), VersionMeta(), "commit version failed"); VersionMeta commitedVersionMeta; Locator commitedLocator; std::lock_guard<std::mutex> lockReopen(_reopenMutex); { std::lock_guard<std::mutex> lockCleaner(_cleanerMutex); auto [status, version] = _tabletCommitter->Commit(currentTabletData, _fence, _tabletOptions->GetBuildConfig().GetMaxCommitRetryCount(), _idGenerator.get(), commitOptions); if (!status.IsOK()) { TABLET_LOG(ERROR, "commit version failed, error[%s]", status.ToString().c_str()); return std::make_pair(status, VersionMeta()); } auto [status2, versionMeta] = VersionMetaCreator::Create(GetRootDirectory(), *currentTabletData, version); if (!status2.IsOK()) { TABLET_LOG(ERROR, "create version meta failed"); return {status2, VersionMeta()}; } TABLET_LOG(INFO, "end commit version[%d]", version.GetVersionId()); commitedLocator = version.GetLocator(); commitedVersionMeta = versionMeta; } if (commitOptions.NeedReopenInCommit()) { TABLET_LOG(INFO, "reopen in commit, versionid[%d]", commitedVersionMeta.GetVersionId()); auto status = DoReopenUnsafe(ReopenOptions(_openOptions), VersionCoord {commitedVersionMeta.GetVersionId(), commitedVersionMeta.GetFenceName()}); if (!status.IsOK()) { TABLET_LOG(ERROR, "reopen commited version failed"); return {status, VersionMeta()}; } } _tabletInfos->SetLastCommittedLocator(commitedLocator); TABLET_LOG(INFO, "end commit version id [%d] fence [%s], with [%ld] docs, [%lu] segments, size [%.2fG]", commitedVersionMeta.GetVersionId(), commitedVersionMeta.GetFenceName().c_str(), commitedVersionMeta.GetDocCount(), commitedVersionMeta.GetSegments().size(), commitedVersionMeta.GetIndexSize() / 1024.0 / 1024 / 1024); return {Status::OK(), commitedVersionMeta}; } Status Tablet::LoadVersion(const VersionCoord& versionCoord, Version* version, std::string* versionRoot) const { auto versionId = versionCoord.GetVersionId(); if (versionId == INVALID_VERSIONID) { *version = MakeEmptyVersion(GetTabletSchema()->GetSchemaId()); return Status::OK(); } // 1. if read from remote: root is remote root. // 2. if read from local(dp mode): remote root should be set to local root by suez. auto indexRoot = _tabletInfos->GetIndexRoot(); bool fromPublicRoot = false; if ((versionId & Version::PRIVATE_VERSION_ID_MASK) && _tabletOptions->IsOnline()) { if (_tabletOptions->FlushRemote()) { assert(false); auto st = Status::Corruption("private mask with flush remote"); TABLET_LOG(ERROR, "%s", st.ToString().c_str()); return st; } else { *versionRoot = PathUtil::JoinPath(indexRoot.GetLocalRoot(), _fence.GetFenceName()); } } else { // "" root in entry_table means use the path where entry_table is, so we must use remote root for leader *versionRoot = (_tabletOptions->FlushRemote() || _tabletOptions->GetNeedReadRemoteIndex()) ? indexRoot.GetRemoteRoot() : indexRoot.GetLocalRoot(); if (!versionCoord.GetVersionFenceName().empty()) { *versionRoot = PathUtil::JoinPath(*versionRoot, versionCoord.GetVersionFenceName()); } else { fromPublicRoot = true; } } auto versionRootDir = indexlib::file_system::Directory::GetPhysicalDirectory(*versionRoot); auto status = VersionLoader::GetVersion(versionRootDir, versionId, version); if (!status.IsOK()) { TABLET_LOG(ERROR, "load version [%s] from [%s] failed: %s", versionCoord.DebugString().c_str(), versionRoot->c_str(), status.ToString().c_str()); return status; } if (fromPublicRoot) { // public version is just a copy without entry table, should locate version file in fence. *versionRoot = PathUtil::JoinPath(*versionRoot, version->GetFenceName()); } return Status::OK(); } std::pair<Status, Version> Tablet::MountOnDiskVersion(const VersionCoord& versionCoord, const std::shared_ptr<indexlibv2::framework::VersionDeployDescription>& versionDpDesc) { Version version; std::string versionRoot; auto status = LoadVersion(versionCoord, &version, &versionRoot); if (!status.IsOK()) { return {status, Version()}; } if (!version.IsValid()) { return {Status::OK(), MakeEmptyVersion(GetTabletSchema()->GetSchemaId())}; } auto lfs = _fence.GetFileSystem(); assert(lfs != nullptr); std::shared_ptr<indexlib::file_system::LifecycleTable> lifecycleTable; if (_tabletOptions->IsOnline()) { if (versionDpDesc != nullptr) { lifecycleTable = versionDpDesc->GetLifecycleTable(); } } status = indexlib::file_system::toStatus(lfs->MountVersion(versionRoot, versionCoord.GetVersionId(), /*rawLogicalPath=*/"", indexlib::file_system::FSMT_READ_ONLY, lifecycleTable)); if (!status.IsOK()) { TABLET_LOG(ERROR, "Mount version[%s] on [%s] failed: %s", versionCoord.DebugString().c_str(), versionRoot.c_str(), status.ToString().c_str()); return {status, Version()}; } // used for update metrics for PartitionIndexSize later [[maybe_unused]] auto ret = lfs->CalculateVersionFileSize(versionRoot, /*rawLogicalPath=*/"", versionCoord.GetVersionId()); return std::make_pair(Status::OK(), std::move(version)); } Status Tablet::FinalizeTabletData(TabletData* tabletData, const std::shared_ptr<config::ITabletSchema>& writeSchema) { assert(tabletData); auto resourceMap = tabletData->GetResourceMap(); if (!resourceMap) { auto st = Status::InternalError("resource map is empty"); TABLET_LOG(ERROR, "%s", st.ToString().c_str()); return st; } const auto& version = tabletData->GetOnDiskVersion(); auto schemaGroup = std::make_shared<TabletDataSchemaGroup>(); schemaGroup->multiVersionSchemas = _tabletSchemaMgr->GetTabletSchemaCache(); schemaGroup->writeSchema = writeSchema; auto schema = _tabletSchemaMgr->GetSchema(version.GetSchemaId()); assert(schema); schemaGroup->onDiskWriteSchema = schema; schema = _tabletSchemaMgr->GetSchema(version.GetReadSchemaId()); assert(schema); schemaGroup->onDiskReadSchema = schema; auto status = resourceMap->AddVersionResource(TabletDataSchemaGroup::NAME, schemaGroup); if (!status.IsOK()) { TABLET_LOG(ERROR, "add [%s] to resource map failed: %s", TabletDataSchemaGroup::NAME, status.ToString().c_str()); return status; } // declare task configs const auto& taskConfigs = _tabletOptions->GetAllIndexTaskConfigs(); for (auto& config : taskConfigs) { tabletData->DeclareTaskConfig(config.GetTaskName(), config.GetTaskType()); } return Status::OK(); } int64_t Tablet::GetSuggestBuildingSegmentMemoryUse() { if (_tabletOptions->IsOnline()) { return _tabletOptions->GetOnlineConfig().GetMaxRealtimeMemoryUse() / 3; } if (_buildMemoryQuotaSynchronizer && _buildMemoryQuotaSynchronizer->GetTotalQuota() != std::numeric_limits<int64_t>::max()) { return _buildMemoryQuotaSynchronizer->GetTotalQuota(); } return 6L * 1024 * 1024 * 1024; // default 6G } Status Tablet::PrepareEmptyTabletData(const std::shared_ptr<config::ITabletSchema>& tabletSchema) { // assert dataMutex has been locked or not necessary _tabletData = std::make_unique<TabletData>(_tabletInfos->GetTabletName()); auto emptyVersion = MakeEmptyVersion(tabletSchema->GetSchemaId()); auto status = _tabletData->Init(/*onDiskVersion=*/emptyVersion, /*segments=*/ {}, std::make_shared<ResourceMap>()); assert(status.IsOK()); status = FinalizeTabletData(_tabletData.get(), tabletSchema); if (!status.IsOK()) { TABLET_LOG(ERROR, "finalize tablet data failed: %s", status.ToString().c_str()); assert(false); return status; } return Status::OK(); } bool Tablet::NeedRecoverFromLocal() const { return (!_tabletOptions->IsLeader() && _tabletOptions->IsOnline() && _tabletOptions->FlushLocal() && !_tabletOptions->FlushRemote() && _tabletOptions->GetOnlineConfig().LoadRemainFlushRealtimeIndex()); } Status Tablet::Open(const IndexRoot& indexRoot, const std::shared_ptr<config::ITabletSchema>& tabletSchema, const std::shared_ptr<config::TabletOptions>& options, const VersionCoord& versionCoord) { TABLET_LOG(INFO, "open tablet begin, indexRoot[%s], version[%s], leader[%d], flushlocal[%d], flushremote[%d]", indexRoot.ToString().c_str(), versionCoord.DebugString().c_str(), options->IsLeader(), options->FlushLocal(), options->FlushRemote()); autil::ScopedTime2 timer; _tabletInfos->SetIndexRoot(indexRoot); _tabletOptions = std::make_shared<config::TabletOptions>(*options); _tabletOptions->SetTabletName(_tabletInfos->GetTabletName()); _tabletFactory = CreateTabletFactory(tabletSchema->GetTableType(), _tabletOptions); if (_tabletFactory == nullptr) { auto st = Status::ConfigError("create tablet factory failed"); TABLET_LOG(ERROR, "%s", st.ToString().c_str()); return st; } auto status = TabletSchemaLoader::ResolveSchema(_tabletOptions, indexRoot.GetRemoteRoot(), tabletSchema.get()); if (!status.IsOK()) { TABLET_LOG(ERROR, "resolve schema failed: %s", status.ToString().c_str()); return status; } if (_idGenerator == nullptr) { _idGenerator = std::make_shared<IdGenerator>(_tabletOptions->FlushRemote() ? IdMaskType::BUILD_PUBLIC : IdMaskType::BUILD_PRIVATE); TABLET_LOG(INFO, "use built-in id generator"); } status = _tabletOptions->Check(_tabletInfos->GetIndexRoot().GetRemoteRoot(), _tabletInfos->GetIndexRoot().GetLocalRoot()); if (!status.IsOK()) { TABLET_LOG(ERROR, "check tablet options failed"); return status; } try { auto status = PrepareIndexRoot(tabletSchema); if (!status.IsOK()) { TABLET_LOG(ERROR, "tablet prepare index root [%s] failed", indexRoot.ToString().c_str()); return status; } _tabletSchemaMgr = std::make_unique<TabletSchemaManager>(_tabletFactory, _tabletOptions, _fence.GetGlobalRoot(), indexRoot.GetRemoteRoot(), _fence.GetFileSystem()); _tabletSchemaMgr->InsertSchemaToCache(tabletSchema); _memSegmentCreator = std::make_unique<MemSegmentCreator>(_tabletInfos->GetTabletName(), _tabletOptions.get(), _tabletFactory.get(), _idGenerator.get(), GetRootDirectory()->GetIDirectory()); status = PrepareEmptyTabletData(tabletSchema); if (!status.IsOK()) { TABLET_LOG(ERROR, "tablet prepare empty tablet data failed"); return status; } status = PrepareResource(); if (!status.IsOK()) { TABLET_LOG(ERROR, "tablet prepare resource failed"); return status; } versionid_t loadedLocalVersionId = INVALID_VERSIONID; autil::ScopeGuard tabletPhaseGuard = _tabletMetrics->CreateTabletPhaseGuard(TabletPhase::OPEN); if (NeedRecoverFromLocal()) { std::string fenceName = Fence::GenerateNewFenceName(_tabletOptions->FlushRemote(), _tabletInfos->GetTabletId()); std::string localFenceRoot = PathUtil::JoinPath(_tabletInfos->GetIndexRoot().GetLocalRoot(), fenceName); auto [recoverStatus, localVersion] = RecoverLatestVersion(localFenceRoot); if (!recoverStatus.IsOK()) { auto cleanStatus = DropIndexCleaner::DropPrivateFence(_tabletInfos->GetIndexRoot().GetLocalRoot()); RETURN_IF_STATUS_ERROR(cleanStatus, "clean local index failed"); TABLET_LOG(WARN, "local index is cleaned [%s]", localFenceRoot.c_str()); return recoverStatus; } Version targetVersion; std::string versionRoot; status = LoadVersion(versionCoord, &targetVersion, &versionRoot); if (!status.IsOK()) { TABLET_LOG(WARN, "load version[%s] failed", versionCoord.DebugString().c_str()); return status; } if (!localVersion.IsValid() || localVersion.GetVersionId() == INVALID_VERSIONID || targetVersion.GetLocator().IsFasterThan(localVersion.GetLocator(), /*ignoreLegacyDiffSrc=*/true) == Locator::LocatorCompareResult::LCR_FULLY_FASTER) { // do nothing } else if (!targetVersion.CanFastFowardFrom(localVersion.GetVersionLine().GetHeadVersion(), /*isDirty*/ false)) { auto cleanStatus = DropIndexCleaner::DropPrivateFence(_tabletInfos->GetIndexRoot().GetLocalRoot()); RETURN_IF_STATUS_ERROR(cleanStatus, "clean local index failed"); TABLET_LOG(WARN, "local index is cleaned [%s]", localFenceRoot.c_str()); status = Status::Abort("local private version [%d] is not compatible with target version[%s]", localVersion.GetVersionId(), versionCoord.DebugString().c_str()); TABLET_LOG(ERROR, "%s", status.ToString().c_str()); return status; } else { std::lock_guard<std::mutex> lockReopen(_reopenMutex); status = DoReopenUnsafe(ReopenOptions(_openOptions), localVersion.GetVersionId()); if (!status.IsOK()) { TABLET_LOG(ERROR, "recover from local version [%d] failed", localVersion.GetVersionId()); auto cleanStatus = DropIndexCleaner::DropPrivateFence(_tabletInfos->GetIndexRoot().GetLocalRoot()); RETURN_IF_STATUS_ERROR(cleanStatus, "clean local index failed"); TABLET_LOG(WARN, "local index is cleaned [%s]", localFenceRoot.c_str()); return status; } loadedLocalVersionId = localVersion.GetVersionId(); TABLET_LOG(INFO, "recover from local version[%d] succeed.", loadedLocalVersionId); } } if (loadedLocalVersionId != INVALID_VERSIONID && versionCoord.GetVersionId() == INVALID_VERSIONID) { // do nothing } else { std::lock_guard<std::mutex> lockReopen(_reopenMutex); status = DoReopenUnsafe(ReopenOptions(_openOptions), versionCoord); if (!status.IsOK()) { TABLET_LOG(ERROR, "tablet load version [%s] failed", versionCoord.DebugString().c_str()); return status; } } } catch (const indexlib::util::FileIOException& e) { // TODO(hanyao): fill status message b exception TABLET_LOG(ERROR, "open caught file io exception: %s", e.what()); return Status::IOError("open caught fail io exception: ", e.what()); } catch (const autil::legacy::ExceptionBase& e) { TABLET_LOG(ERROR, "open caught exception, %s", e.what()); return Status::Unknown("open caught exception: ", e.what()); } catch (const std::exception& e) { // TODO(hanyao): fill status message b exception TABLET_LOG(ERROR, "open caught exception, %s", e.what()); return Status::Unknown("open caught exception: ", e.what()); } catch (...) { TABLET_LOG(ERROR, "open caught exception"); return Status::Unknown("open caught exception"); } if (!StartIntervalTask()) { TABLET_LOG(ERROR, "start interval task failed"); return Status::Corruption("Start interval task failed."); } _tabletCommitter->Init(_versionMerger, _tabletData); TABLET_LOG(INFO, "open tablet end, used [%.3f]s", timer.done_sec()); _isClosed = false; _needSeek.store(false); return Status::OK(); } void Tablet::UpdateDocCount() { _tabletData->RefreshDocCount(); _tabletInfos->UpdateTabletDocCount(_tabletData->GetTabletDocCount()); } // This is used inside Reopen() and updates runtime state of the tablet. Data and internal data structures should not be // changed inside this function. Status Tablet::UpdateControlFlow(const ReopenOptions& reopenOptions) { assert(reopenOptions.GetOpenOptions().GetUpdateControlFlowOnly()); if (_consistentModeBuildThreadPool == nullptr && reopenOptions.GetOpenOptions().GetConsistentModeBuildThreadCount() > 0) { size_t queueSize = size_t(-1); auto threadPoolQueueFactory = std::make_shared<autil::ThreadPoolQueueFactory>(); _consistentModeBuildThreadPool = std::make_shared<autil::ThreadPool>(reopenOptions.GetOpenOptions().GetConsistentModeBuildThreadCount(), queueSize, threadPoolQueueFactory, "consistent-parallel-build"); } if (_inconsistentModeBuildThreadPool == nullptr && reopenOptions.GetOpenOptions().GetInconsistentModeBuildThreadCount() > 0) { size_t queueSize = size_t(-1); auto threadPoolQueueFactory = std::make_shared<autil::ThreadPoolQueueFactory>(); _inconsistentModeBuildThreadPool = std::make_shared<autil::ThreadPool>(reopenOptions.GetOpenOptions().GetInconsistentModeBuildThreadCount(), queueSize, threadPoolQueueFactory, "inconsistent-parallel-build"); } std::lock_guard<std::mutex> guard(_dataMutex); if (_tabletWriter == nullptr) { // Writer is not initialized yet, so there is nothing to update. return Status::OK(); } BuildResource buildResource = GenerateBuildResource(COUNTER_PREFIX); return _tabletWriter->Open(/*tabletData=*/nullptr, buildResource, reopenOptions.GetOpenOptions()); } Status Tablet::Reopen(const ReopenOptions& reopenOptions, const VersionCoord& versionCoord) { const auto& versionId = versionCoord.GetVersionId(); TABLET_LOG(INFO, "reopen tablet begin, force[%d], version [%d]", reopenOptions.IsForceReopen(), versionId); if (reopenOptions.IsForceReopen()) { return Status::Unimplement("tablet not support force reopen, try reload, version [%d]", versionId); } autil::ScopedTime2 timer; if (versionId == CONTROL_FLOW_VERSIONID) { TABLET_LOG(INFO, "reopen as entrance to update control flow only."); auto status = UpdateControlFlow(reopenOptions); _openOptions = reopenOptions.GetOpenOptions(); _openOptions.SetUpdateControlFlowOnly(false); TABLET_LOG(INFO, "reopen tablet end, version [%d], status [%s], used [%.3f]s", versionId, status.ToString().c_str(), timer.done_sec()); return status; } ReopenOptions clonedReopenOptions; clonedReopenOptions.SetOpenOptions(_openOptions); if (versionId == INVALID_VERSIONID) { TABLET_LOG(ERROR, "reopen failed, version invalid"); return Status::InvalidArgs("version invalid"); } try { TabletPhase tabletPhase = clonedReopenOptions.IsForceReopen() ? TabletPhase::FORCE_REOPEN : TabletPhase::NORMAL_REOPEN; autil::ScopeGuard tabletPhaseGuard = _tabletMetrics->CreateTabletPhaseGuard(tabletPhase); std::lock_guard<std::mutex> lockReopen(_reopenMutex); auto status = DoReopenUnsafe(clonedReopenOptions, versionCoord); TABLET_LOG(INFO, "reopen tablet end, force[%d], version [%s], status [%s], used [%.3f]s", clonedReopenOptions.IsForceReopen(), versionCoord.DebugString().c_str(), status.ToString().c_str(), timer.done_sec()); return status; } catch (const indexlib::util::FileIOException& e) { TABLET_LOG(ERROR, "reopen caught file io exception: %s", e.what()); return Status::IOError("reopen caught faile io exception", e.what()); } catch (const std::exception& e) { TABLET_LOG(ERROR, "reopen caught exception: %s", e.what()); return Status::Unknown("reopen caught std exception"); } catch (...) { TABLET_LOG(ERROR, "reopen caught unknown exception"); return Status::Unknown("reopen caught unknown exception"); } } VersionCoord CalculateHead(const std::shared_ptr<TabletData>& currentTabletData, const Fence& fence, bool* hasBuildingSegment) { assert(currentTabletData); auto ondiskVersion = currentTabletData->GetOnDiskVersion(); auto headVersion = ondiskVersion.GetVersionLine().GetHeadVersion(); versionid_t headVersionId = headVersion.GetVersionId(); std::string headFence = headVersion.GetVersionFenceName(); auto segments = currentTabletData->CreateSlice(); // 有building segment的情况下,说明当前tablet构建了数据,就不能reopen其他fence产出的version // 否则有可能出现segment号,相同的情况 *hasBuildingSegment = false; for (const auto& segmentPtr : segments) { if (segmentPtr->GetSegmentStatus() != Segment::SegmentStatus::ST_BUILT && Segment::IsPublicSegmentId(segmentPtr->GetSegmentId())) { *hasBuildingSegment = true; } } if (*hasBuildingSegment) { headFence = fence.GetFenceName(); } return VersionCoord(headVersionId, headFence); } Status Tablet::DoReopenUnsafe(const ReopenOptions& reopenOptions, const VersionCoord& versionCoord) { // do not clean in-memory index while reopen // otherwise mounted version may be erased by mistake. std::lock_guard<std::mutex> lockCleaner(_cleanerMutex); BuildResource buildResource = GenerateBuildResource(COUNTER_PREFIX); indexlib::util::ScopeLatencyReporter reopenLatency(_tabletMetrics->GetreopenIncLatencyMetric().get()); auto statusOrDpDesc = CreateVersionDeployDescription(versionCoord.GetVersionId()); RETURN_IF_STATUS_ERROR(statusOrDpDesc, "create version deploy description failed for version [%d]", versionCoord.GetVersionId()); auto versionDpDesc = std::move(statusOrDpDesc.steal_value()); auto [mountStatus, version] = MountOnDiskVersion(versionCoord, versionDpDesc); if (!mountStatus.IsOK()) { TABLET_LOG(ERROR, "mount version [%s] failed: %s", versionCoord.DebugString().c_str(), mountStatus.ToString().c_str()); return mountStatus; } std::shared_ptr<TabletData> currentTabletData; { std::lock_guard<std::mutex> lock(_dataMutex); currentTabletData = _tabletData; } TABLET_LOG(INFO, "do reopen, version [%d => %d]", currentTabletData->GetOnDiskVersion().GetVersionId(), version.GetVersionId()); bool hasBuildingSegment; auto headVersionCoord = CalculateHead(currentTabletData, _fence, &hasBuildingSegment); bool isPrivateVersion = (versionCoord.GetVersionId() & Version::PRIVATE_VERSION_ID_MASK) > 0; if (_tabletOptions->IsOnline() && !isPrivateVersion && !version.CanFastFowardFrom(headVersionCoord, hasBuildingSegment)) { TABLET_LOG(ERROR, "do reopen failed, version can't fast from onDiskVersion, head version [%s], version line [%s]", autil::legacy::ToJsonString(headVersionCoord, true).c_str(), autil::legacy::ToJsonString(version.GetVersionLine()).c_str()); return Status::Corruption("reopen fastford failed"); } _idGenerator->UpdateBaseVersion(version); auto loadSchemaStatus = _tabletSchemaMgr->LoadAllSchema(version); if (!loadSchemaStatus.IsOK()) { TABLET_LOG(ERROR, "do reopen failed, load all schema failed"); return loadSchemaStatus; } TABLET_LOG(INFO, "begin load segments, force[%d], segments [%s]", reopenOptions.IsForceReopen(), GetDiffSegmentDebugString(version, currentTabletData).c_str()); auto readSchema = _tabletSchemaMgr->GetSchema(version.GetReadSchemaId()); if (currentTabletData) { auto status = DropIndexCleaner::CleanIndexInLogical(currentTabletData, readSchema, GetRootDirectory()->GetIDirectory()); if (!status.IsOK()) { TABLET_LOG(ERROR, "do reopen failed, clean drop index failed"); return status; } } auto currentLifecycleTable = LifecycleTableCreator::CreateLifecycleTable( version, _tabletOptions->GetOnlineConfig().GetLifecycleConfig(), {{indexlib::file_system::LifecyclePatternBase::CURRENT_TIME, std::to_string(indexlib::file_system::LifecycleConfig::CurrentTimeInSeconds())}}); if (currentLifecycleTable == nullptr) { TABLET_LOG(ERROR, "do reopen failed, create LifecycleTable failed for version[%d]", version.GetVersionId()); return Status::Corruption("reopen failed due to null lifecycleTable"); } std::vector<std::pair<std::shared_ptr<Segment>, bool>> segmentPairs; std::shared_ptr<indexlib::file_system::Directory> root = GetRootDirectory(); for (auto [segmentId, schemaId] : version) { auto seg = currentTabletData->GetSegment(segmentId); auto segmentDirName = version.GetSegmentDirName(segmentId); auto currentLifecycle = currentLifecycleTable->GetLifecycle(segmentDirName + "/"); bool needLoadDiskSegment = true; if (seg != nullptr && seg->GetSegmentStatus() == Segment::SegmentStatus::ST_BUILT) { seg->GetSegmentDirectory()->SetLifecycle(currentLifecycle); auto segmentSchemaId = seg->GetSegmentSchema()->GetSchemaId(); auto preLifecycle = seg->GetSegmentLifecycle(); if (segmentSchemaId != schemaId) { auto diskSegment = std::dynamic_pointer_cast<DiskSegment>(seg); assert(diskSegment != nullptr); std::vector<std::shared_ptr<config::ITabletSchema>> tabletSchemas; auto status = _tabletSchemaMgr->GetSchemaList(segmentSchemaId, schemaId, version, tabletSchemas); RETURN_IF_STATUS_ERROR(status, "get schema list failed, segmentSchemaId[%d], segmentId[%d]", segmentSchemaId, segmentId); if (preLifecycle != currentLifecycle) { RETURN_IF_STATUS_ERROR( status, "config conficts, segmentId[%d] schemaId updated [%d->%d], lifecycle updated[%s -> %s]", segmentId, segmentSchemaId, schemaId, preLifecycle.c_str(), currentLifecycle.c_str()); } status = diskSegment->Reopen(tabletSchemas); RETURN_IF_STATUS_ERROR(status, "disk segment open failed, segmentId[%d]", segmentId); needLoadDiskSegment = false; } else { if (preLifecycle != currentLifecycle) { TABLET_LOG(INFO, "built segmentId[%d] lifecycle updated [%s] -> [%s]", segmentId, preLifecycle.c_str(), currentLifecycle.c_str()); } else { needLoadDiskSegment = false; } } } if (!needLoadDiskSegment) { segmentPairs.emplace_back(std::make_pair(seg, /*needOpen=*/false)); } else { auto segDir = root->GetDirectory(segmentDirName, /*throwExceptionIfNotExist=*/false); if (!segDir) { auto st = Status::IOError("get segment[%d] dir failed", segmentId); TABLET_LOG(ERROR, "do reopen failed, %s", st.ToString().c_str()); return st; } segDir->SetLifecycle(currentLifecycle); SegmentMeta segmentMeta; segmentMeta.segmentId = segmentId; segmentMeta.lifecycle = currentLifecycle; auto schema = _tabletSchemaMgr->GetSchema(schemaId); segmentMeta.schema = schema; segmentMeta.segmentDir = segDir; auto readerOption = indexlib::file_system::ReaderOption::PutIntoCache(indexlib::file_system::FSOT_MEM); if (!segmentMeta.segmentInfo->Load(segDir->GetIDirectory(), readerOption).IsOK()) { auto st = Status::Corruption("load segment info[%s] failed", segDir->GetLogicalPath().c_str()); TABLET_LOG(ERROR, "do reopen failed, %s", st.ToString().c_str()); return st; } auto diskSegment = std::shared_ptr<Segment>(_tabletFactory->CreateDiskSegment(segmentMeta, buildResource).release()); assert(diskSegment); segmentPairs.emplace_back(std::make_pair(diskSegment, /*needOpen=*/true)); } } TABLET_LOG(INFO, "end load segments, segment count [%lu]", segmentPairs.size()); TABLET_LOG(INFO, "begin load tablet data, fence name[%s]", _fence.GetFenceName().c_str()); const indexlib::util::MemoryReserverPtr memReserver = _fence.GetFileSystem()->CreateMemoryReserver("load_segments"); auto tabletLoader = _tabletFactory->CreateTabletLoader(_fence.GetFenceName()); assert(tabletLoader); auto versionSchema = _tabletSchemaMgr->GetSchema(version.GetSchemaId()); tabletLoader->Init(_tabletMemoryQuotaController, versionSchema, memReserver, _tabletOptions->IsOnline()); { indexlib::util::ScopeLatencyReporter preloadLatency(_tabletMetrics->GetpreloadLatencyMetric().get()); auto status = tabletLoader->PreLoad(*currentTabletData, std::move(segmentPairs), version); if (!status.IsOK()) { TABLET_LOG(ERROR, "do reopen failed, tablet preload version [%s] failed", version.DebugString().c_str()); return status; } } indexlib::util::ScopeLatencyReporter finalLoadLatency(_tabletMetrics->GetfinalLoadLatencyMetric().get()); std::lock_guard lock(_dataMutex); if (version.GetSchemaId() > GetTabletSchema()->GetSchemaId()) { auto status = DoAlterTable(versionSchema); if (!status.IsOK()) { TABLET_LOG(ERROR, "do reopen failed, do alter table on schema[%u] failed: %s", version.GetSchemaId(), status.ToString().c_str()); return status; } } auto [status, newTabletData] = tabletLoader->FinalLoad(*_tabletData); if (!status.IsOK()) { TABLET_LOG(ERROR, "do reopen failed, tablet final load failed"); return status; } status = FinalizeTabletData(newTabletData.get(), GetTabletSchema()); if (!status.IsOK()) { TABLET_LOG(ERROR, "do reopen failed, finalize tablet data failed: %s", status.ToString().c_str()); return status; } assert(newTabletData->GetResourceMap()); newTabletData->ReclaimSegmentResource(); TABLET_LOG(INFO, "end load tablet data"); _tabletDumper->TrimDumpingQueue(*newTabletData); if (version.GetVersionId() & Version::PRIVATE_VERSION_ID_MASK) { versionDpDesc = _tabletInfos->GetLoadedVersionDeployDescription(); } status = OpenWriterAndReader(std::move(newTabletData), reopenOptions.GetOpenOptions(), versionDpDesc); if (status.IsOK()) { if (!(version.GetVersionId() & Version::PRIVATE_VERSION_ID_MASK)) { _tabletInfos->SetLoadedPublishVersion(version); _tabletInfos->SetLoadedVersionDeployDescription(versionDpDesc); } else { _tabletInfos->SetLoadedPrivateVersion(version); } if (_versionMerger) { _versionMerger->UpdateVersion(version); } } else { TABLET_LOG(ERROR, "do reopen failed, tablet open writer and reader failed"); } if (!GetMemSegmentLocator().IsValid()) { if (_tabletInfos->GetLoadedPublishVersion().IsSealed()) { _sealedSourceLocator = _tabletInfos->GetLoadedPublishVersion().GetLocator(); if (_sealedSourceLocator) { _tabletCommitter->SetSealed(true); } } else { if (_sealedSourceLocator) { _sealedSourceLocator.reset(); _tabletCommitter->SetSealed(false); } } } _tabletCommitter->SetLastPublicVersion(version); RETURN_IF_STATUS_ERROR(status, "open writer and reader failed"); return Status::OK(); } void Tablet::Close() { if (_isClosed) { return; } TABLET_LOG(INFO, "close tablet [%p] begin", this); if (_taskScheduler) { _taskScheduler->CleanTasks(); _taskScheduler.reset(); } if (_versionMerger) { _versionMerger->WaitStop(); } _tabletReader.reset(); if (_tabletReaderContainer) { _tabletReaderContainer->Close(); } TABLET_LOG(INFO, "close tablet [%p] end", this); _isClosed = true; if (_closeRunFunc) { _closeRunFunc(); } } Status Tablet::PrepareIndexRoot(const std::shared_ptr<config::ITabletSchema>& schema) { // TODO memory controller TABLET_LOG(INFO, "begin prepare index root"); auto status = InitIndexDirectory(_tabletInfos->GetIndexRoot()); if (!status.IsOK()) { return status; } status = InitBasicIndexInfo(schema); if (!status.IsOK()) { return status; } TABLET_LOG(INFO, "end prepare index root"); return Status::OK(); } bool Tablet::IsEmptyDir(std::shared_ptr<indexlib::file_system::Directory> directory, const std::shared_ptr<config::ITabletSchema>& schema) { std::string rootDir = directory->GetOutputPath(); std::string schemaFileName = schema->GetSchemaFileName(); std::string schemaFilePath = PathUtil::JoinPath(rootDir, schemaFileName); bool schemaExist = indexlib::file_system::FslibWrapper::IsExist(schemaFilePath).GetOrThrow(); if (!schemaExist) { TABLET_LOG(INFO, "empty index dir, no schema file"); return true; } std::string indexFormatVersionPath = PathUtil::JoinPath(rootDir, INDEX_FORMAT_VERSION_FILE_NAME); bool formatVersionExist = indexlib::file_system::FslibWrapper::IsExist(indexFormatVersionPath).GetOrThrow(); if (!formatVersionExist) { TABLET_LOG(INFO, "empty index dir, no [%s] file", INDEX_FORMAT_VERSION_FILE_NAME); return true; } return false; } Status Tablet::InitBasicIndexInfo(const std::shared_ptr<config::ITabletSchema>& schema) { TABLET_LOG(INFO, "begin init index dir"); try { auto rootDirectory = indexlib::file_system::Directory::GetPhysicalDirectory(_fence.GetGlobalRoot()); if (!IsEmptyDir(rootDirectory, schema) || _tabletOptions->IsReadOnly()) { return Status::OK(); } // BINARY_FORMAT_VERSION = 1 std::string indexFormatVersionStr = std::string(R"({"index_format_version":")") + indexlib::INDEX_FORMAT_VERSION + R"(","inverted_index_binary_format_version":1})"; std::string formatVersionPath = indexlib::util::PathUtil::JoinPath(_fence.GetGlobalRoot(), INDEX_FORMAT_VERSION_FILE_NAME); auto ec = indexlib::file_system::FslibWrapper::AtomicStore(formatVersionPath, indexFormatVersionStr).Code(); if (ec != indexlib::file_system::FSEC_OK && ec != indexlib::file_system::FSEC_EXIST) { return Status::IOError(); } std::string schemaFileName = schema->GetSchemaFileName(); std::string content; if (!schema->Serialize(/*isCompact*/ false, &content)) { TABLET_LOG(ERROR, "schema serialize failed"); return Status::IOError(); } std::string schemaPath = indexlib::util::PathUtil::JoinPath(_fence.GetGlobalRoot(), schemaFileName); ec = indexlib::file_system::FslibWrapper::AtomicStore(schemaPath, content).Code(); if (ec != indexlib::file_system::FSEC_OK && ec != indexlib::file_system::FSEC_EXIST) { return Status::IOError(); } rootDirectory->Sync(true); _fence.GetFileSystem() ->MountFile(rootDirectory->GetRootDir(), INDEX_FORMAT_VERSION_FILE_NAME, INDEX_FORMAT_VERSION_FILE_NAME, indexlib::file_system::FSMT_READ_ONLY, indexFormatVersionStr.size(), false) .GetOrThrow(); _fence.GetFileSystem() ->MountFile(rootDirectory->GetRootDir(), schemaFileName, schemaFileName, indexlib::file_system::FSMT_READ_ONLY, content.size(), false) .GetOrThrow(); } catch (const autil::legacy::ExceptionBase& e) { TABLET_LOG(ERROR, "init index dir failed io exception [%s]", e.what()); return Status::IOError(); } TABLET_LOG(INFO, "end init index dir"); return Status::OK(); } Status Tablet::PrepareResource() { TABLET_LOG(INFO, "begin prepare resource"); if (_tabletReaderContainer) { return Status::OK(); } auto planCreator = _tabletFactory->CreateIndexTaskPlanCreator(); if (planCreator && _mergeController && _tabletOptions->IsLeader()) { auto indexRoot = _tabletInfos->GetIndexRoot().GetRemoteRoot(); _versionMerger = std::make_shared<VersionMerger>(_tabletInfos->GetTabletName(), std::move(indexRoot), _mergeController, std::move(planCreator), _metricsManager.get(), _tabletOptions->IsOnline()); TABLET_LOG(INFO, "version merger created, isOnline[%d]", _tabletOptions->IsOnline()); } if (_buildMemoryQuotaSynchronizer == nullptr) { auto buildMemoryQuotaController = std::make_shared<MemoryQuotaController>( _tabletInfos->GetTabletName() + "_build", _tabletOptions->GetBuildMemoryQuota()); _buildMemoryQuotaSynchronizer = std::make_shared<MemoryQuotaSynchronizer>(buildMemoryQuotaController); } auto reporter = _fence.GetFileSystem()->GetFileSystemMetricsReporter(); if (reporter != nullptr) { indexlib::util::KeyValueMap tagMap; tagMap["schema_name"] = GetTabletSchema()->GetTableName(); // string generationStr; // if (onDiskVersion.GetDescription("generation", generationStr)) { // tagMap["generation"] = generationStr; // } reporter->UpdateGlobalTags(tagMap); } _tabletInfos->SetCounterMap(_metricsManager->GetCounterMap()); auto status = _tabletInfos->InitCounter(_tabletOptions->IsOnline()); if (!status.IsOK()) { TABLET_LOG(ERROR, "init counter failed [%s]", status.ToString().c_str()); return status; } _tabletReaderContainer = std::make_shared<TabletReaderContainer>(_tabletInfos->GetTabletName()); _tabletDumper->Init(_tabletOptions->GetMaxDumpIntervalSecond()); assert(!_tabletMetrics); _tabletMetrics = std::dynamic_pointer_cast<TabletMetrics>( _metricsManager->CreateMetrics("tablet", [this]() -> std::shared_ptr<IMetrics> { return std::make_shared<TabletMetrics>(_metricsManager->GetMetricsReporter(), _tabletMemoryQuotaController.get(), _tabletInfos->GetTabletName(), _tabletDumper.get(), _versionMerger.get()); })); assert(_tabletMetrics); _tabletInfos->SetTabletMetrics(_tabletMetrics); _memControlStrategy = _tabletFactory->CreateMemoryControlStrategy(_buildMemoryQuotaSynchronizer); if (_memControlStrategy == nullptr) { _memControlStrategy.reset(new DefaultMemoryControlStrategy(_tabletOptions, _buildMemoryQuotaSynchronizer)); } TABLET_LOG(INFO, "end prepare resource"); return Status::OK(); } std::shared_ptr<document::IDocumentParser> Tablet::CreateDocumentParser() { // TODO: support built in indexlibv2::document::DocumentParser return std::shared_ptr<document::IDocumentParser>(); } std::unique_ptr<ITabletFactory> Tablet::CreateTabletFactory(const std::string& tableType, const std::shared_ptr<config::TabletOptions>& options) const { auto tabletFactoryCreator = TabletFactoryCreator::GetInstance(); auto tabletFactory = tabletFactoryCreator->Create(tableType); if (!tabletFactory) { TABLET_LOG(ERROR, "create tablet factory with type [%s] failed, registered types [%s]", tableType.c_str(), autil::legacy::ToJsonString(tabletFactoryCreator->GetRegisteredType(), true).c_str()); return nullptr; } if (!tabletFactory->Init(options, _metricsManager.get())) { TABLET_LOG(ERROR, "init tablet factory with type [%s] failed", tableType.c_str()); return nullptr; } return tabletFactory; } Status Tablet::InitIndexDirectory(const IndexRoot& indexRoot) { TABLET_LOG(INFO, "begin init index directory, IndexRoot [%s]", indexRoot.ToString().c_str()); auto metricProvider = std::make_shared<indexlib::util::MetricProvider>(_metricsManager->GetMetricsReporter()); indexlib::file_system::FileSystemOptions fsOptions; // TODO(hanyao): set default root path on load config list // TODO(hanyao): get config from options // memory quota // file block cache controller fsOptions.outputStorage = indexlib::file_system::FSST_MEM; if (_tabletOptions->GetBuildConfig().GetIsEnablePackageFile()) { fsOptions.outputStorage = indexlib::file_system::FSST_PACKAGE_MEM; } fsOptions.memoryQuotaControllerV2 = _tabletMemoryQuotaController; fsOptions.loadConfigList = _tabletOptions->GetLoadConfigList(); fsOptions.fileBlockCacheContainer = _fileBlockCacheContainer; fsOptions.redirectPhysicalRoot = _tabletOptions->IsOnline(); std::string primaryRoot = _tabletOptions->FlushRemote() ? indexRoot.GetRemoteRoot() : indexRoot.GetLocalRoot(); std::string fenceName = Fence::GenerateNewFenceName(_tabletOptions->FlushRemote(), _tabletInfos->GetTabletId()); auto status = RecoverIndexInfo(primaryRoot, fenceName); if (!status.IsOK()) { TABLET_LOG(ERROR, "recover index from[%s][%s] failed", primaryRoot.c_str(), fenceName.c_str()); return status; } std::string fenceRoot = PathUtil::JoinPath(primaryRoot, fenceName); auto [fsStatus, fsPtr] = indexlib::file_system::FileSystemCreator::Create( _tabletInfos->GetTabletName(), fenceRoot, fsOptions, metricProvider, /*isOverride=*/false, indexlib::file_system::FenceContext::NoFence()) .StatusWith(); if (!fsStatus.IsOK()) { TABLET_LOG(ERROR, "init file system with fence[%s][%s] failed", primaryRoot.c_str(), fenceName.c_str()); return fsStatus; } bool exist = false; status = indexlib::file_system::FslibWrapper::IsExist(fenceRoot, exist).Status(); if (!status.IsOK()) { TABLET_LOG(ERROR, "is exist on [%s] failed: %s", fenceRoot.c_str(), status.ToString().c_str()); return status; } if (exist) { status = fsPtr ->MountVersion(fenceRoot, INVALID_VERSIONID, /*logicalPath=*/"", indexlib::file_system::FSMT_READ_ONLY, nullptr) .Status(); if (!status.IsOK()) { TABLET_LOG(ERROR, "mount fence[%s] failed: %s", fenceRoot.c_str(), status.ToString().c_str()); return status; } } fsPtr->SetDefaultRootPath(indexRoot.GetLocalRoot(), indexRoot.GetRemoteRoot()); Fence fence(primaryRoot, fenceName, std::move(fsPtr)); _fence = std::move(fence); TABLET_LOG(INFO, "end init file system, IndexRoot [%s], newFence[%s]", indexRoot.ToString().c_str(), _fence.GetFenceName().c_str()); return Status::OK(); } std::pair<Status, Version> Tablet::RecoverLatestVersion(const std::string& path) const { auto directory = indexlib::file_system::Directory::GetPhysicalDirectory(path); Version emptyVersion; if (!directory) { auto status = Status::IOError("get path[%s] physical root failed", path.c_str()); TABLET_LOG(ERROR, "%s", status.ToString().c_str()); return {status, emptyVersion}; } // remote index can be cleaned by VersionCleaner only auto [status, version] = IndexRecoverStrategy::RecoverLatestVersion(directory, /*cleanBrokenSegments=*/_tabletOptions->FlushLocal()); if (!status.IsOK()) { TABLET_LOG(ERROR, "recover latest version from [%s] failed", path.c_str()); return {status, emptyVersion}; } return {Status::OK(), version}; } Status Tablet::RecoverIndexInfo(const std::string& indexRoot, const std::string& fenceName) { assert(!indexRoot.empty()); auto [status, version] = RecoverLatestVersion(PathUtil::JoinPath(indexRoot, fenceName)); if (!status.IsOK()) { return status; } _idGenerator->UpdateBaseVersion(version); auto [mainStatus, mainVersion] = RecoverLatestVersion(indexRoot); if (!mainStatus.IsOK()) { return mainStatus; } _idGenerator->UpdateBaseVersion(mainVersion); return Status::OK(); } Status Tablet::FlushUnsafe() { [[maybe_unused]] bool r = SealSegmentUnsafe(); auto status = _tabletDumper->Dump(_tabletOptions->GetBuildConfig().GetDumpThreadCount()); return status; } Status Tablet::Flush() { TABLET_LOG(INFO, "flush tablet begin"); Status result; { std::lock_guard<std::mutex> guard(_dataMutex); result = FlushUnsafe(); } TABLET_LOG(INFO, "flush tablet end , result [%s]", result.ToString().c_str()); return result; } Status Tablet::Seal() { std::lock_guard<std::mutex> guard(_dataMutex); TABLET_LOG(INFO, "seal tablet begin"); if (_sealedSourceLocator) { TABLET_LOG(INFO, "tablet already sealed on srouce locator [%s]", _sealedSourceLocator.value().DebugString().c_str()); return Status::OK(); } auto slice = _tabletData->CreateSlice(); if (slice.empty()) { TABLET_LOG(INFO, "empty tablet will not seal"); return Status::OK(); } auto status = FlushUnsafe(); if (!status.IsOK()) { TABLET_LOG(ERROR, "dump failed: %s", status.ToString().c_str()); return status; } status = _tabletDumper->Seal(_tabletOptions->GetBuildConfig().GetDumpThreadCount()); if (!status.IsOK()) { TABLET_LOG(ERROR, "seal failed: %s", status.ToString().c_str()); return status; } auto latestLocator = GetTabletInfos()->GetLatestLocator(); _sealedSourceLocator = latestLocator; TABLET_LOG(INFO, "seal tablet end, latestLocator [%s]", latestLocator.DebugString().c_str()); return Status::OK(); } bool Tablet::SealSegmentUnsafe() { if (!_tabletWriter) { return false; } auto segmentDumper = _tabletWriter->CreateSegmentDumper(); if (!segmentDumper) { return false; } TABLET_LOG(INFO, "seal segment [%d]", segmentDumper->GetSegmentId()); _tabletDumper->PushSegmentDumper(std::move(segmentDumper)); CloseWriterUnsafe(); return true; } // Check whether long time no dump, if true, dump the current building segment. void Tablet::DumpSegmentOverInterval() { std::lock_guard<std::mutex> guard(_dataMutex); if (!_tabletWriter) { return; } // NeedDumpOverInterval() is true when the delta of current time and last dump's timestamp is less // then the MaxDumpInterval, as Build() will update the last dump's timestamp also, the check is // under lock. if (!_tabletDumper->NeedDumpOverInterval()) { return; } bool sealed = SealSegmentUnsafe(); TABLET_LOG(INFO, "seal segment by internal [%d s] done [%d]", _tabletOptions->GetMaxDumpIntervalSecond(), sealed); if (sealed) { auto status = ReopenNewSegment(GetTabletSchema()); if (!status.IsOK()) { TABLET_LOG(ERROR, "reopen new segment after seal failed"); CloseWriterUnsafe(); } } } Status Tablet::CleanIndexFiles(const std::vector<versionid_t>& reservedVersionIds) { if (unlikely(_tabletOptions->IsReadOnly())) { TABLET_LOG(WARN, "tablet is open in readonly mode, clean index do nothing"); return Status::OK(); } std::lock_guard<std::mutex> lockCleaner(_cleanerMutex); OnDiskIndexCleaner cleaner(_tabletInfos->GetIndexRoot().GetLocalRoot(), _tabletInfos->GetTabletName(), _tabletOptions->GetBuildConfig().GetKeepVersionCount(), _tabletReaderContainer.get()); return cleaner.Clean(reservedVersionIds); } Status Tablet::CleanUnreferencedDeployFiles(const std::set<std::string>& toKeepFiles) { if (unlikely(_tabletOptions->IsReadOnly())) { TABLET_LOG(WARN, "tablet is open in readonly mode, clean index do nothing"); return Status::OK(); } std::lock_guard<std::mutex> lockCleaner(_cleanerMutex); OnDiskIndexCleaner cleaner(_tabletInfos->GetIndexRoot().GetLocalRoot(), _tabletInfos->GetTabletName(), _tabletOptions->GetBuildConfig().GetKeepVersionCount(), _tabletReaderContainer.get()); return cleaner.Clean(GetRootDirectory(), _tabletInfos->GetLoadedPublishVersion(), toKeepFiles); } Status Tablet::CheckAlterTableCompatible(const std::shared_ptr<TabletData>& tabletData, const std::shared_ptr<config::ITabletSchema>& oldSchema, const std::shared_ptr<config::ITabletSchema>& newSchema) { // check ignore new schema // env ignore_alter_table_schema_ids = 1;2;3 std::string ignoreAlterSchemaIdStr; if (autil::EnvUtil::getEnvWithoutDefault("ignore_alter_table_schema_ids", ignoreAlterSchemaIdStr)) { std::vector<schemaid_t> ignoreSchemaIds; autil::StringUtil::fromString(ignoreAlterSchemaIdStr, ignoreSchemaIds, ";"); auto schemaId = newSchema->GetSchemaId(); for (auto ignoreSchema : ignoreSchemaIds) { if (schemaId == ignoreSchema) { TABLET_LOG(ERROR, "alter table with ignore schema id [%d], all ignore schemas [%s]", schemaId, ignoreAlterSchemaIdStr.c_str()); return Status::InvalidArgs(); } } } // check duplicate index for (auto indexConfig : newSchema->GetIndexConfigs()) { auto indexType = indexConfig->GetIndexType(); auto indexName = indexConfig->GetIndexName(); if (!oldSchema->GetIndexConfig(indexType, indexName)) { // indexConfig is add index config // check index config has in old segment auto slice = tabletData->CreateSlice(); for (auto segment : slice) { auto segmentSchema = segment->GetSegmentSchema(); if (segmentSchema->GetIndexConfig(indexType, indexName)) { TABLET_LOG(ERROR, "alter table add same old index, wait old index delete, then readd"); return Status::InvalidArgs(); } } } } return config::TabletSchema::CheckUpdateSchema(oldSchema, newSchema); } Status Tablet::AlterTable(const std::shared_ptr<config::ITabletSchema>& newSchema) { if (unlikely(_tabletOptions->IsReadOnly())) { return Status::Unimplement("readonly tablet not support build"); } TABLET_LOG(INFO, "begin alter tablet for schema[%u]", newSchema->GetSchemaId()); auto status = TabletSchemaLoader::ResolveSchema(_tabletOptions, _tabletInfos->GetIndexRoot().GetRemoteRoot(), newSchema.get()); if (!status.IsOK()) { TABLET_LOG(ERROR, "resolve schema [%u] failed: %s", newSchema->GetSchemaId(), status.ToString().c_str()); return status; } auto oldSchema = GetTabletSchema(); if (oldSchema->GetSchemaId() == newSchema->GetSchemaId()) { TABLET_LOG(WARN, "schema [%s] already exist", newSchema->GetSchemaFileName().c_str()); return Status::OK(); } { std::lock_guard<std::mutex> guard(_dataMutex); status = CheckAlterTableCompatible(_tabletData, oldSchema, newSchema); if (!status.IsOK()) { TABLET_LOG(ERROR, "new schema [%u] not compatible with old schema [%u]: %s", newSchema->GetSchemaId(), oldSchema->GetSchemaId(), status.ToString().c_str()); return status; } } status = _tabletSchemaMgr->StoreSchema(*newSchema); if (status.IsExist()) { TABLET_LOG(WARN, "schema [%s] already exist", newSchema->GetSchemaFileName().c_str()); } else if (!status.IsOK()) { TABLET_LOG(ERROR, "store schema [%u] failed: %s", newSchema->GetSchemaId(), status.ToString().c_str()); return status; } std::lock_guard<std::mutex> guard(_dataMutex); status = DoAlterTable(newSchema); if (!status.IsOK()) { return status; } _tabletDumper->AlterTable(newSchema->GetSchemaId()); TABLET_LOG(INFO, "end alter tablet for schema[%u]", newSchema->GetSchemaId()); return Status::OK(); } Status Tablet::DoAlterTable(const std::shared_ptr<config::ITabletSchema>& newSchema) { bool segmentSealed = SealSegmentUnsafe(); _tabletSchemaMgr->InsertSchemaToCache(newSchema); auto status = Status::OK(); if (segmentSealed) { status = ReopenNewSegment(newSchema); } else { RefreshStrategy strategy = RefreshStrategy::KEEP; if (_tabletWriter) { if (_tabletWriter->IsDirty()) { TABLET_LOG(ERROR, "tablet writer is dirty, but segment is not sealed!"); return Status::InternalError("tablet writer is dirty, but segment is not sealed!"); } strategy = RefreshStrategy::REPLACE_BUILDING_SEGMENT; } TABLET_LOG(INFO, "refresh tablet data by [%s]", strategy == RefreshStrategy::KEEP ? "keep" : "replace building"); status = RefreshTabletData(strategy, newSchema); } if (!status.IsOK()) { TABLET_LOG(ERROR, "reopen new segment failed: %s", status.ToString().c_str()); CloseWriterUnsafe(); return status; } TABLET_LOG(INFO, "schema[%u] updated", newSchema->GetSchemaId()); return Status::OK(); } Status Tablet::RenewFenceLease(bool createIfNotExist) { if (!_tabletOptions->IsLeader() || !_tabletOptions->FlushRemote()) { // only public fence need renew lease. return Status::OK(); } auto status = _fence.RenewFenceLease(createIfNotExist); if (!status.IsOK()) { TABLET_LOG(ERROR, "%s", status.ToString().c_str()); } return status; } std::shared_ptr<framework::ResourceCleaner> Tablet::CreateResourceCleaner() { return std::make_shared<ResourceCleaner>(_tabletReaderContainer.get(), GetRootDirectory(), _tabletInfos->GetTabletName(), !_tabletOptions->IsLeader(), &_cleanerMutex, _tabletOptions->GetBackgroundTaskConfig().GetCleanResourceIntervalMs()); } bool Tablet::StartIntervalTask() { TABLET_LOG(INFO, "begin start interval task"); if (!_taskScheduler) { TABLET_LOG(WARN, "TaskScheduler is nullptr."); return true; } _taskScheduler->CleanTasks(); const config::BackgroundTaskConfig& taskConfig = _tabletOptions->GetBackgroundTaskConfig(); auto startTask = [this](auto taskType, auto func, auto interval) { auto taskString = TaskTypeToString(taskType); if (interval <= 0) { TABLET_LOG(INFO, "task[%s] disabled, interval[%ld]", taskString, interval); return true; } if (!_taskScheduler->StartIntervalTask(taskString, std::move(func), std::chrono::milliseconds(interval))) { TABLET_LOG(ERROR, "Start interval task %s failed.", taskString); return false; } return true; }; if (!startTask( TaskType::TT_RENEW_FENCE_LEASE, [this]() { [[maybe_unused]] auto status = RenewFenceLease(/*createIfNotExist=*/false); }, taskConfig.GetRenewFenceLeaseIntervalMs())) { return false; } auto cleanResourceTask = CreateResourceCleaner(); if (!startTask( TaskType::TT_CLEAN_RESOURCE, [this, cleanResourceTask]() { cleanResourceTask->Run(); if (_enableMemReclaimer) { MemoryReclaim(); } }, taskConfig.GetCleanResourceIntervalMs())) { return false; } if (!startTask( TaskType::TT_INTERVAL_DUMP, [this]() { DumpSegmentOverInterval(); }, taskConfig.GetDumpIntervalMs())) { return false; } if (!startTask( TaskType::TT_REPORT_METRICS, [this]() { ReportMetrics(GetTabletWriter()); }, taskConfig.GetReportMetricsIntervalMs())) { return false; } if (!startTask( TaskType::TT_ASYNC_DUMP, [this]() { (void)_tabletDumper->Dump(_tabletOptions->GetBuildConfig().GetDumpThreadCount()); }, taskConfig.GetAsyncDumpIntervalMs())) { return false; } if (_versionMerger && _tabletOptions->AutoMerge()) { if (!startTask( TaskType::TT_MERGE_VERSION, [this, versionMerger = _versionMerger]() { versionMerger->Run() .via(_taskScheduler->GetTaskScheduler()->GetExecutor()) .start([versionMerger = _versionMerger](future_lite::Try<std::pair<Status, versionid_t>>&&) {}); }, taskConfig.GetMergeIntervalMs())) { return false; } } // only for compute-storage separation if (_tabletOptions->GetNeedReadRemoteIndex() && DeployIndexUtil::NeedSubscribeRemoteIndex(_tabletInfos->GetIndexRoot().GetRemoteRoot())) { if (!startTask( TaskType::TT_SUBSCRIBE_REMOTE_INDEX, [this]() { DeployIndexUtil::SubscribeRemoteIndex(_tabletInfos->GetIndexRoot().GetRemoteRoot(), _tabletInfos->GetLoadedPublishVersion().GetVersionId()); }, taskConfig.GetSubscribeRemoteIndexIntervalMs())) { return false; } } TABLET_LOG(INFO, "end start interval task"); return true; } void Tablet::MemoryReclaim() { if (_memReclaimer) { _memReclaimer->TryReclaim(); } } void Tablet::PrepareTabletMetrics(const std::shared_ptr<TabletWriter>& tabletWriter) { auto tabletData = GetTabletData(); _tabletMetrics->UpdateMetrics(_tabletReaderContainer, tabletData, tabletWriter, _tabletOptions->GetBuildMemoryQuota(), _fence.GetFileSystem()); _memControlStrategy->SyncMemoryQuota(_tabletMetrics); MemoryStatus memoryStatus = CheckMemoryStatus(); _tabletInfos->SetMemoryStatus(memoryStatus); _tabletMetrics->SetMemoryStatus(memoryStatus); _tabletMetrics->PrintMetrics(_tabletInfos->GetTabletName(), _tabletOptions->GetPrintMetricsInterval()); } void Tablet::ReportMetrics(const std::shared_ptr<TabletWriter>& tabletWriter) { PrepareTabletMetrics(tabletWriter); _metricsManager->ReportMetrics(); _fence.GetFileSystem()->ReportMetrics(); if (tabletWriter != nullptr) { tabletWriter->ReportMetrics(); } } MemoryStatus Tablet::CheckMemoryStatus() const { auto memStatus = _memControlStrategy->CheckRealtimeIndexMemoryQuota(_tabletMetrics); if (memStatus != MemoryStatus::OK) { return memStatus; } return _memControlStrategy->CheckTotalMemoryQuota(_tabletMetrics); } std::shared_ptr<TabletWriter> Tablet::GetTabletWriter() const { std::lock_guard<std::mutex> guard(_dataMutex); return _tabletWriter; } BuildResource Tablet::GenerateBuildResource(const std::string& counterPrefix) { BuildResource buildResource; buildResource.memController = _tabletMemoryQuotaController; buildResource.metricsManager = _metricsManager.get(); buildResource.buildDocumentMetrics = _tabletMetrics->GetBuildDocumentMetrics(); auto buildingMemLimit = _tabletOptions->GetBuildConfig().GetBuildingMemoryLimit(); if (buildingMemLimit == -1) { buildingMemLimit = GetSuggestBuildingSegmentMemoryUse(); TABLET_LOG(INFO, "using suggest building segment memory [%s]", autil::UnitUtil::GiBDebugString(buildingMemLimit).c_str()); } buildResource.buildingMemLimit = buildingMemLimit; buildResource.counterMap = _metricsManager->GetCounterMap(); buildResource.counterPrefix = counterPrefix; buildResource.buildResourceMetrics = std::make_shared<indexlib::util::BuildResourceMetrics>(); buildResource.buildResourceMetrics->Init(); buildResource.indexMemoryReclaimer = _memReclaimer.get(); buildResource.consistentModeBuildThreadPool = _consistentModeBuildThreadPool; buildResource.inconsistentModeBuildThreadPool = _inconsistentModeBuildThreadPool; auto tabletData = GetTabletData(); if (tabletData) { auto slice = tabletData->CreateSlice(); for (auto seg : slice) { buildResource.segmentDirs.push_back({seg->GetSegmentId(), seg->GetSegmentDirectory()}); } } return buildResource; } ReadResource Tablet::GenerateReadResource() const { ReadResource readResource; readResource.metricsManager = _metricsManager.get(); readResource.indexMemoryReclaimer = _memReclaimer; readResource.rootDirectory = GetRootDirectory(); readResource.searchCache = _searchCache; return readResource; } void Tablet::SetRunAfterCloseFunction(const Tablet::RunAfterCloseFunc&& func) { _closeRunFunc = std::move(func); } std::pair<Status, versionid_t> Tablet::ExecuteTask(const Version& sourceVersion, const std::string& taskType, const std::string& taskName, const std::map<std::string, std::string>& params) { if (!_versionMerger) { RETURN2_IF_STATUS_ERROR(Status::InvalidArgs(), INVALID_VERSIONID, "version merger is nullptr"); } return future_lite::coro::syncAwait(_versionMerger.get()->ExecuteTask(sourceVersion, taskType, taskName, params)); } Status Tablet::ImportExternalFiles(const std::string& bulkloadId, const std::vector<std::string>& externalFiles, const std::shared_ptr<ImportExternalFileOptions>& options, Action action, int64_t eventTimeInSecs) { std::lock_guard<std::mutex> lock(_dataMutex); Status status; if (bulkloadId.empty()) { status = Status::InvalidArgs("bulkload id is empty"); TABLET_LOG(ERROR, "%s", status.ToString().c_str()); return status; } // check ignore bulkload id // env IGNORE_BULKLOAD_ID = id1;id2;id3 std::string ignoreBulkloadIdStr; if (autil::EnvUtil::getEnvWithoutDefault("IGNORE_BULKLOAD_ID", ignoreBulkloadIdStr)) { std::vector<std::string> ignoreBulkloadIds; autil::StringUtil::fromString(ignoreBulkloadIdStr, ignoreBulkloadIds, ";"); for (auto ignoreBulkloadId : ignoreBulkloadIds) { if (bulkloadId == ignoreBulkloadId) { status = Status::InvalidArgs("bulkload with ignore bulkload id [%s], all ignore bulkload id [%s]", bulkloadId.c_str(), ignoreBulkloadIdStr.c_str()); TABLET_LOG(ERROR, "%s", status.ToString().c_str()); return status; } } } if (action == Action::ADD) { // avoid duplicate bulkload call const auto& currentOnDiskVersion = GetTabletData()->GetOnDiskVersion(); auto indexTask = currentOnDiskVersion.GetIndexTaskQueue()->Get(BULKLOAD_TASK_TYPE, bulkloadId); if (indexTask != nullptr) { return status; } if (_tabletCommitter->HasIndexTask(BULKLOAD_TASK_TYPE, bulkloadId)) { return status; } RETURN_IF_STATUS_ERROR(FlushUnsafe(), "flush before import external file failed, isLeader[%d]", _tabletOptions->IsLeader()); } if (_tabletOptions->IsLeader()) { IndexTaskMetaCreator creator; std::map<std::string, std::string> params; std::string taskName = "bulkload"; params[PARAM_BULKLOAD_ID] = bulkloadId; params[PARAM_EXTERNAL_FILES] = autil::legacy::ToJsonString(externalFiles); if (action == Action::ADD) { std::string comment; if (externalFiles.empty()) { status = Status::InvalidArgs("external file list is empty."); } else if (options == nullptr || !options->IsValidMode()) { status = Status::InvalidArgs("invalid import external file options."); } if (!status.IsOK()) { action = Action::SUSPEND; comment = status.ToString(); TABLET_LOG(ERROR, "add index task failed, bulkload id is %s, status: %s", bulkloadId.c_str(), status.ToString().c_str()); } else { params[PARAM_IMPORT_EXTERNAL_FILE_OPTIONS] = autil::legacy::ToJsonString(options); } uint64_t newSegId = _idGenerator->GetNextSegmentId(); params[PARAM_LAST_SEQUENCE_NUMBER] = autil::StringUtil::toString(newSegId << 24); auto meta = creator.TaskType(BULKLOAD_TASK_TYPE) .TaskTraceId(bulkloadId) .TaskName(taskName) .Params(params) .EventTimeInSecs(eventTimeInSecs) .Comment(comment) .Create(); _tabletCommitter->HandleIndexTask(meta, action); _idGenerator->CommitNextSegmentId(); } else if (action == Action::OVERWRITE) { if (externalFiles.empty()) { auto status = Status::InvalidArgs("external file list is empty."); TABLET_LOG(ERROR, "overwrite index task failed, bulkload id is %s, status: %s", bulkloadId.c_str(), status.ToString().c_str()); return status; } if (options == nullptr || !options->IsValidMode()) { auto status = Status::InvalidArgs("invalid import external file options."); TABLET_LOG(ERROR, "overwrite index task failed, bulkload id is %s, status: %s", bulkloadId.c_str(), status.ToString().c_str()); return status; } params[PARAM_IMPORT_EXTERNAL_FILE_OPTIONS] = autil::legacy::ToJsonString(options); auto meta = creator.TaskType(BULKLOAD_TASK_TYPE) .TaskTraceId(bulkloadId) .TaskName(taskName) .Params(params) .EventTimeInSecs(eventTimeInSecs) .Create(); _tabletCommitter->HandleIndexTask(meta, action); } else if (action == Action::ABORT) { auto meta = creator.TaskType(BULKLOAD_TASK_TYPE).TaskTraceId(bulkloadId).TaskName(taskName).Create(); _tabletCommitter->HandleIndexTask(meta, action); } else if (action == Action::SUSPEND) { auto meta = creator.TaskType(BULKLOAD_TASK_TYPE).TaskTraceId(bulkloadId).TaskName(taskName).Create(); _tabletCommitter->HandleIndexTask(meta, action); } else { status = Status::InvalidArgs("invalid action, usage action=%s|%s|%s|%s", ActionConvertUtil::ActionToStr(Action::ADD).c_str(), ActionConvertUtil::ActionToStr(Action::ABORT).c_str(), ActionConvertUtil::ActionToStr(Action::OVERWRITE).c_str(), ActionConvertUtil::ActionToStr(Action::SUSPEND).c_str()); } } if (!status.IsOK()) { TABLET_LOG(ERROR, "import external file failed, status: %s, bulkload id %s, external files %s, options %s", status.ToString().c_str(), bulkloadId.c_str(), autil::legacy::ToJsonString(externalFiles, /*isCompact=*/true).c_str(), autil::legacy::ToJsonString(options, /*isCompact=*/true).c_str()); return status; } TABLET_LOG(INFO, "import external file, isLeader[%d], bulkload id %s, external files %s, options %s", _tabletOptions->IsLeader(), bulkloadId.c_str(), autil::legacy::ToJsonString(externalFiles, /*isCompact=*/true).c_str(), autil::legacy::ToJsonString(options, /*isCompact=*/true).c_str()); return status; } Status Tablet::Import(const std::vector<Version>& versions, const ImportOptions& options) { std::shared_ptr<ITabletImporter> importer = _tabletFactory->CreateTabletImporter(options.GetImportType()); if (importer == nullptr) { TABLET_LOG(ERROR, "Import failed: not support import type %s.", options.GetImportType().c_str()); return Status::Unimplement(); } std::shared_ptr<TabletData> currentTabletData; { std::lock_guard<std::mutex> lock(_dataMutex); currentTabletData = GetTabletData(); } const auto& baseVersion = currentTabletData->GetOnDiskVersion(); std::vector<Version> validVersions; auto status = importer->Check(versions, &baseVersion, options, &validVersions); if (status.IsExist()) { TABLET_LOG(WARN, "input versions already exist, no need to be imported"); return Status::OK(); } RETURN_IF_STATUS_ERROR(status, "import check versions failed"); _tabletCommitter->Import(validVersions, importer, options); return Status::OK(); } std::shared_ptr<config::ITabletSchema> Tablet::GetReadSchema(const std::shared_ptr<TabletData>& tabletData) { auto readSchema = tabletData->GetOnDiskVersionReadSchema(); return readSchema ? readSchema : GetTabletSchema(); } Version Tablet::MakeEmptyVersion(schemaid_t schemaId) const { Version v; v.SetSchemaId(schemaId); v.SetReadSchemaId(schemaId); return v; } void Tablet::SetTabletData(std::shared_ptr<TabletData> tabletData) { std::lock_guard<std::mutex> lock(_tabletDataMutex); _tabletData = tabletData; } std::shared_ptr<TabletData> Tablet::GetTabletData() const { std::lock_guard<std::mutex> lock(_tabletDataMutex); return _tabletData; } StatusOr<std::shared_ptr<indexlibv2::framework::VersionDeployDescription>> Tablet::CreateVersionDeployDescription(versionid_t versionId) { if (!_tabletOptions->IsOnline()) { return nullptr; } if (versionId < 0 || (versionId & Version::PRIVATE_VERSION_ID_MASK) || !_tabletOptions->GetOnlineConfig().EnableLocalDeployManifestChecking()) { return nullptr; } auto versionDpDesc = std::make_shared<indexlibv2::framework::VersionDeployDescription>(); if (!indexlibv2::framework::VersionDeployDescription::LoadDeployDescription( _tabletInfos->GetIndexRoot().GetLocalRoot(), versionId, versionDpDesc.get())) { TABLET_LOG(ERROR, "load version deploy description failed in CreateVersionDeployDescription, versionId[%d], " "indexRootPath[%s]", static_cast<int>(versionId), _tabletInfos->GetIndexRoot().GetLocalRoot().c_str()); return Status::Corruption(); } return versionDpDesc; } std::string Tablet::GetDiffSegmentDebugString(const Version& targetVersion, const std::shared_ptr<TabletData>& currentTabletData) const { std::set<segmentid_t> currentSegmentIds; std::set<segmentid_t> targetSegmentIds; std::set<segmentid_t> allSegmentIds; if (currentTabletData) { for (auto segment : currentTabletData->CreateSlice()) { currentSegmentIds.insert(segment->GetSegmentId()); allSegmentIds.insert(segment->GetSegmentId()); } } for (auto [segmentId, schemaId] : targetVersion) { targetSegmentIds.insert(segmentId); allSegmentIds.insert(segmentId); } std::stringstream ss; for (auto segmentId : allSegmentIds) { if (ss.tellp()) { ss << ", "; } bool isInCurrentSegments = currentSegmentIds.count(segmentId) > 0; bool isInTargetSegments = targetSegmentIds.count(segmentId) > 0; if (!isInCurrentSegments && isInTargetSegments) { ss << "+" << segmentId; } else if (isInCurrentSegments && !isInTargetSegments) { auto sizeInG = autil::UnitUtil::GiB(currentTabletData->GetSegment(segmentId)->EvaluateCurrentMemUsed()); ss << "-" << segmentId << ":" << autil::StringUtil::fToString(sizeInG, "%.1f") << "G"; } else { assert(isInCurrentSegments && isInTargetSegments); ss << "=" << segmentId; } } return ss.str(); } void Tablet::CloseWriterUnsafe() { if (_tabletWriter) { _tabletWriter->Close(); _tabletWriter.reset(); } } #undef TABLET_LOG } // namespace indexlibv2::framework