in aios/storage/indexlib/indexlib/partition/custom_online_partition.cpp [210:384]
IndexPartition::OpenStatus CustomOnlinePartition::DoOpen(const string& primaryDir, const string& secondaryDir,
const IndexPartitionSchemaPtr& schema,
const IndexPartitionOptions& options,
versionid_t targetVersionId, bool isForceOpen)
{
IE_PREFIX_LOG(INFO, "Open Index Partition: primary[%s], secondary[%s]", primaryDir.c_str(), secondaryDir.c_str());
IndexPhaseGuard phaseGuard(mIndexPhase, OPENING);
mReaderContainer.reset(new ReaderContainer);
if (!mDumpSegmentQueue) {
mDumpSegmentQueue.reset(new DumpSegmentQueue());
}
mEnableAsyncDump =
(options.GetOnlineConfig().enableAsyncDumpSegment || options.GetBuildConfig(true).EnableAsyncDump());
int64_t ioExceptionRetryLimit = autil::EnvUtil::getEnv<int64_t>("CUSTOM_IO_EXCEPTION_RETRY_LIMIT", -1);
int64_t ioExceptionRetryInterval = autil::EnvUtil::getEnv<int64_t>("CUSTOM_IO_EXCEPTION_RETRY_INTERVAL", -1);
if (ioExceptionRetryLimit != -1) {
mIOExceptionRetryLimit = ioExceptionRetryLimit;
}
if (ioExceptionRetryInterval != -1) {
mIOExceptionRetryInterval = ioExceptionRetryInterval;
}
IE_LOG(INFO, "IO exception retry limit = [%zu], retry interval = [%zu]", mIOExceptionRetryLimit,
mIOExceptionRetryInterval);
auto customizedParams = options.GetBuildConfig(true).GetCustomizedParams();
int64_t checkRtBuildDumpLimit = ParamsUtil::GetParam<int64_t>("check_rt_build_dump_limit", customizedParams, -1);
int64_t highMemUsedWatermark = ParamsUtil::GetParam<int64_t>("high_mem_used_watermark", customizedParams, -1);
int64_t lowMemUsedWatermark = ParamsUtil::GetParam<int64_t>("low_mem_used_watermark", customizedParams, -1);
mNeedCheckRealTimeBuild = ParamsUtil::GetParam<bool>("need_check_rt_build", customizedParams, true);
if (checkRtBuildDumpLimit != -1) {
mCheckRtBuildDumpLimit = max(int64_t(0), checkRtBuildDumpLimit);
}
if (highMemUsedWatermark != -1) {
mHighMemUsedWatermark = highMemUsedWatermark;
}
if (lowMemUsedWatermark != -1) {
mLowMemUsedWatermark = lowMemUsedWatermark;
}
IE_LOG(INFO,
"Check rt build dump limit = [%zu], "
"high mem used watermark = [%zu], low mem used watermark = [%zu]",
mCheckRtBuildDumpLimit, mHighMemUsedWatermark, mLowMemUsedWatermark);
IndexPartition::OpenStatus openStatus =
IndexPartition::Open(primaryDir, secondaryDir, schema, options, targetVersionId);
if (openStatus != OS_OK) {
return openStatus;
}
mReaderContainer.reset(new ReaderContainer);
if (!mRealtimeQuotaController) {
mRealtimeQuotaController.reset(
new MemoryQuotaController(options.GetOnlineConfig().maxRealtimeMemSize * 1024 * 1024));
}
mRtMemQuotaSynchronizer = CreateRealtimeMemoryQuotaSynchronizer(mRealtimeQuotaController);
mClosed = true;
Version onDiskVersion;
VersionLoader::GetVersionS(secondaryDir, onDiskVersion, targetVersionId);
auto versionDpDesc =
CreateVersionDeployDescription(primaryDir, mOptions.GetOnlineConfig(), onDiskVersion.GetVersionId());
if (versionDpDesc == nullptr) {
string schemaName = (schema == nullptr) ? "NULL-schema" : schema->GetSchemaName();
IE_LOG(ERROR, "create version deploy description failed in table[%s], versionId[%d]", schemaName.c_str(),
static_cast<int>(onDiskVersion.GetVersionId()));
return IndexPartition::OS_FAIL;
}
auto versionLevelLifecycleTable = versionDpDesc->GetLifecycleTable();
if (versionLevelLifecycleTable == nullptr) {
IE_LOG(ERROR, "get lifecycle Table from versionDpDesc error, versionId[%d]",
static_cast<int>(onDiskVersion.GetVersionId()));
return IndexPartition::OS_FAIL;
}
if (unlikely(onDiskVersion.GetVersionId() == INVALID_VERSIONID)) {
const std::string& root = secondaryDir;
THROW_IF_FS_ERROR(mFileSystem->MountFile(root, SCHEMA_FILE_NAME, SCHEMA_FILE_NAME, FSMT_READ_ONLY, -1, false),
"mount schema file failed");
THROW_IF_FS_ERROR(mFileSystem->MountFile(root, INDEX_FORMAT_VERSION_FILE_NAME, INDEX_FORMAT_VERSION_FILE_NAME,
FSMT_READ_ONLY, -1, false),
"mount index format version file failed");
} else {
if (!mOptions.GetOnlineConfig().NeedReadRemoteIndex()) {
THROW_IF_FS_ERROR(mFileSystem->MountVersion(primaryDir, onDiskVersion.GetVersionId(), "", FSMT_READ_ONLY,
versionLevelLifecycleTable),
"mount version failed");
} else {
THROW_IF_FS_ERROR(mFileSystem->MountVersion(secondaryDir, onDiskVersion.GetVersionId(), "", FSMT_READ_ONLY,
versionLevelLifecycleTable),
"mount version failed");
}
}
ScopedLock lock(mDataLock);
ScopedLock schemaLock(mSchemaLock);
IE_PREFIX_LOG(INFO, "current free memory is %ld MB", mPartitionMemController->GetFreeQuota() / (1024 * 1024));
// rt build disable package file
mOptions.GetBuildConfig(true).enablePackageFile = false;
mSchema = SchemaAdapter::LoadSchema(GetFileSystemRootDirectory(), onDiskVersion.GetSchemaVersionId());
CheckParam(mSchema, mOptions);
if (schema) {
mSchema->AssertCompatible(*schema);
}
mTableWriterMemController.reset(
new BlockMemoryQuotaController(mPartitionMemController, mSchema->GetSchemaName() + "_TableWriterMemory"));
mOnlinePartMetrics->RegisterMetrics(mSchema->GetTableType());
mCounterMap.reset(new CounterMap());
if (mOptions.GetOnlineConfig().loadRemainFlushRealtimeIndex && !mOptions.TEST_mReadOnly) {
// first open will recover rt index
mOptions.GetOnlineConfig().enableRecoverIndex = true;
}
mPluginManager = TablePluginLoader::Load(mIndexPluginPath, mSchema, mOptions);
if (!mPluginManager) {
IE_LOG(ERROR, "load index plugin failed for schema [%s]", mSchema->GetSchemaName().c_str());
return OS_FAIL;
}
if (!mTableFactory) {
mTableFactory.reset(new TableFactoryWrapper(mSchema, mOptions, mPluginManager));
if (!mTableFactory->Init()) {
IE_LOG(ERROR, "Init TableFactory failed in table[%s]", mSchema->GetSchemaName().c_str());
return OS_FAIL;
}
}
PartitionDataPtr newPartitionData =
CreateCustomPartitionData(onDiskVersion, index_base::Version(INVALID_VERSIONID), versionDpDesc,
mDumpSegmentQueue, GetReclaimTimestamp(onDiskVersion), false);
mPartitionDataHolder.Reset(newPartitionData);
// only recover once when open
mOptions.GetOnlineConfig().enableRecoverIndex = false;
mFileSystem->SwitchLoadSpeedLimit(
mOptions.GetOnlineConfig().enableLoadSpeedControlForOpen); // true for on, false for off
if (!mOptions.TEST_mReadOnly) {
mWriter.reset(new CustomOnlinePartitionWriter(mOptions, mTableFactory, mFlushedLocatorContainer,
mOnlinePartMetrics.get(), mPartitionName));
InitWriter(mPartitionDataHolder.Get(), mWriter);
}
InitResourceCalculator(mPartitionDataHolder.Get(), mWriter);
PartitionDataPtr clonedPartitionData(mPartitionDataHolder.Get()->Clone());
auto reader =
InitReaderWithMemoryLimit(clonedPartitionData, mWriter, index_base::Version(INVALID_VERSIONID), onDiskVersion);
if (reader) {
auto tableReader = reader->GetTableReader();
mSupportSegmentLevelReader = tableReader->SupportSegmentLevelReader();
}
// remove inc covered segment
CustomPartitionDataPtr typedData = DYNAMIC_POINTER_CAST(CustomPartitionData, newPartitionData);
typedData->RemoveObsoleteSegments();
SwitchReader(reader);
MEMORY_BARRIER();
mFileSystem->SwitchLoadSpeedLimit(true);
mLoadedIncVersion = mPartitionDataHolder.Get()->GetOnDiskVersion();
InitResourceCleaner();
CleanPartitionReaders();
CleanResource();
ReportMetrics();
if (!mReaderUpdater) {
mReaderUpdater.reset(new SearchReaderUpdater(mSchema->GetSchemaName()));
}
if (!PrepareIntervalTask()) {
return OS_FAIL;
}
mClosed = false;
mIsServiceNormal = true;
IE_PREFIX_LOG(INFO, "Open Index Partition End");
return OS_OK;
}