IndexPartition::OpenStatus CustomOnlinePartition::DoOpen()

in aios/storage/indexlib/indexlib/partition/custom_online_partition.cpp [210:384]


IndexPartition::OpenStatus CustomOnlinePartition::DoOpen(const string& primaryDir, const string& secondaryDir,
                                                         const IndexPartitionSchemaPtr& schema,
                                                         const IndexPartitionOptions& options,
                                                         versionid_t targetVersionId, bool isForceOpen)
{
    IE_PREFIX_LOG(INFO, "Open Index Partition: primary[%s], secondary[%s]", primaryDir.c_str(), secondaryDir.c_str());
    IndexPhaseGuard phaseGuard(mIndexPhase, OPENING);
    mReaderContainer.reset(new ReaderContainer);
    if (!mDumpSegmentQueue) {
        mDumpSegmentQueue.reset(new DumpSegmentQueue());
    }
    mEnableAsyncDump =
        (options.GetOnlineConfig().enableAsyncDumpSegment || options.GetBuildConfig(true).EnableAsyncDump());
    int64_t ioExceptionRetryLimit = autil::EnvUtil::getEnv<int64_t>("CUSTOM_IO_EXCEPTION_RETRY_LIMIT", -1);
    int64_t ioExceptionRetryInterval = autil::EnvUtil::getEnv<int64_t>("CUSTOM_IO_EXCEPTION_RETRY_INTERVAL", -1);
    if (ioExceptionRetryLimit != -1) {
        mIOExceptionRetryLimit = ioExceptionRetryLimit;
    }
    if (ioExceptionRetryInterval != -1) {
        mIOExceptionRetryInterval = ioExceptionRetryInterval;
    }
    IE_LOG(INFO, "IO exception retry limit = [%zu], retry interval = [%zu]", mIOExceptionRetryLimit,
           mIOExceptionRetryInterval);

    auto customizedParams = options.GetBuildConfig(true).GetCustomizedParams();
    int64_t checkRtBuildDumpLimit = ParamsUtil::GetParam<int64_t>("check_rt_build_dump_limit", customizedParams, -1);
    int64_t highMemUsedWatermark = ParamsUtil::GetParam<int64_t>("high_mem_used_watermark", customizedParams, -1);
    int64_t lowMemUsedWatermark = ParamsUtil::GetParam<int64_t>("low_mem_used_watermark", customizedParams, -1);
    mNeedCheckRealTimeBuild = ParamsUtil::GetParam<bool>("need_check_rt_build", customizedParams, true);
    if (checkRtBuildDumpLimit != -1) {
        mCheckRtBuildDumpLimit = max(int64_t(0), checkRtBuildDumpLimit);
    }
    if (highMemUsedWatermark != -1) {
        mHighMemUsedWatermark = highMemUsedWatermark;
    }
    if (lowMemUsedWatermark != -1) {
        mLowMemUsedWatermark = lowMemUsedWatermark;
    }
    IE_LOG(INFO,
           "Check rt build dump limit = [%zu], "
           "high mem used watermark = [%zu], low mem used watermark = [%zu]",
           mCheckRtBuildDumpLimit, mHighMemUsedWatermark, mLowMemUsedWatermark);
    IndexPartition::OpenStatus openStatus =
        IndexPartition::Open(primaryDir, secondaryDir, schema, options, targetVersionId);

    if (openStatus != OS_OK) {
        return openStatus;
    }
    mReaderContainer.reset(new ReaderContainer);
    if (!mRealtimeQuotaController) {
        mRealtimeQuotaController.reset(
            new MemoryQuotaController(options.GetOnlineConfig().maxRealtimeMemSize * 1024 * 1024));
    }
    mRtMemQuotaSynchronizer = CreateRealtimeMemoryQuotaSynchronizer(mRealtimeQuotaController);
    mClosed = true;

    Version onDiskVersion;
    VersionLoader::GetVersionS(secondaryDir, onDiskVersion, targetVersionId);

    auto versionDpDesc =
        CreateVersionDeployDescription(primaryDir, mOptions.GetOnlineConfig(), onDiskVersion.GetVersionId());
    if (versionDpDesc == nullptr) {
        string schemaName = (schema == nullptr) ? "NULL-schema" : schema->GetSchemaName();
        IE_LOG(ERROR, "create version deploy description failed in table[%s], versionId[%d]", schemaName.c_str(),
               static_cast<int>(onDiskVersion.GetVersionId()));
        return IndexPartition::OS_FAIL;
    }
    auto versionLevelLifecycleTable = versionDpDesc->GetLifecycleTable();
    if (versionLevelLifecycleTable == nullptr) {
        IE_LOG(ERROR, "get lifecycle Table from versionDpDesc error, versionId[%d]",
               static_cast<int>(onDiskVersion.GetVersionId()));
        return IndexPartition::OS_FAIL;
    }
    if (unlikely(onDiskVersion.GetVersionId() == INVALID_VERSIONID)) {
        const std::string& root = secondaryDir;
        THROW_IF_FS_ERROR(mFileSystem->MountFile(root, SCHEMA_FILE_NAME, SCHEMA_FILE_NAME, FSMT_READ_ONLY, -1, false),
                          "mount schema file failed");
        THROW_IF_FS_ERROR(mFileSystem->MountFile(root, INDEX_FORMAT_VERSION_FILE_NAME, INDEX_FORMAT_VERSION_FILE_NAME,
                                                 FSMT_READ_ONLY, -1, false),
                          "mount index format version file failed");
    } else {
        if (!mOptions.GetOnlineConfig().NeedReadRemoteIndex()) {
            THROW_IF_FS_ERROR(mFileSystem->MountVersion(primaryDir, onDiskVersion.GetVersionId(), "", FSMT_READ_ONLY,
                                                        versionLevelLifecycleTable),
                              "mount version failed");
        } else {
            THROW_IF_FS_ERROR(mFileSystem->MountVersion(secondaryDir, onDiskVersion.GetVersionId(), "", FSMT_READ_ONLY,
                                                        versionLevelLifecycleTable),
                              "mount version failed");
        }
    }
    ScopedLock lock(mDataLock);
    ScopedLock schemaLock(mSchemaLock);
    IE_PREFIX_LOG(INFO, "current free memory is %ld MB", mPartitionMemController->GetFreeQuota() / (1024 * 1024));

    // rt build disable package file
    mOptions.GetBuildConfig(true).enablePackageFile = false;

    mSchema = SchemaAdapter::LoadSchema(GetFileSystemRootDirectory(), onDiskVersion.GetSchemaVersionId());
    CheckParam(mSchema, mOptions);
    if (schema) {
        mSchema->AssertCompatible(*schema);
    }
    mTableWriterMemController.reset(
        new BlockMemoryQuotaController(mPartitionMemController, mSchema->GetSchemaName() + "_TableWriterMemory"));

    mOnlinePartMetrics->RegisterMetrics(mSchema->GetTableType());
    mCounterMap.reset(new CounterMap());
    if (mOptions.GetOnlineConfig().loadRemainFlushRealtimeIndex && !mOptions.TEST_mReadOnly) {
        // first open will recover rt index
        mOptions.GetOnlineConfig().enableRecoverIndex = true;
    }
    mPluginManager = TablePluginLoader::Load(mIndexPluginPath, mSchema, mOptions);
    if (!mPluginManager) {
        IE_LOG(ERROR, "load index plugin failed for schema [%s]", mSchema->GetSchemaName().c_str());
        return OS_FAIL;
    }
    if (!mTableFactory) {
        mTableFactory.reset(new TableFactoryWrapper(mSchema, mOptions, mPluginManager));
        if (!mTableFactory->Init()) {
            IE_LOG(ERROR, "Init TableFactory failed in table[%s]", mSchema->GetSchemaName().c_str());
            return OS_FAIL;
        }
    }
    PartitionDataPtr newPartitionData =
        CreateCustomPartitionData(onDiskVersion, index_base::Version(INVALID_VERSIONID), versionDpDesc,
                                  mDumpSegmentQueue, GetReclaimTimestamp(onDiskVersion), false);
    mPartitionDataHolder.Reset(newPartitionData);
    // only recover once when open
    mOptions.GetOnlineConfig().enableRecoverIndex = false;

    mFileSystem->SwitchLoadSpeedLimit(
        mOptions.GetOnlineConfig().enableLoadSpeedControlForOpen); // true for on, false for off

    if (!mOptions.TEST_mReadOnly) {
        mWriter.reset(new CustomOnlinePartitionWriter(mOptions, mTableFactory, mFlushedLocatorContainer,
                                                      mOnlinePartMetrics.get(), mPartitionName));
        InitWriter(mPartitionDataHolder.Get(), mWriter);
    }
    InitResourceCalculator(mPartitionDataHolder.Get(), mWriter);
    PartitionDataPtr clonedPartitionData(mPartitionDataHolder.Get()->Clone());
    auto reader =
        InitReaderWithMemoryLimit(clonedPartitionData, mWriter, index_base::Version(INVALID_VERSIONID), onDiskVersion);

    if (reader) {
        auto tableReader = reader->GetTableReader();
        mSupportSegmentLevelReader = tableReader->SupportSegmentLevelReader();
    }
    // remove inc covered segment
    CustomPartitionDataPtr typedData = DYNAMIC_POINTER_CAST(CustomPartitionData, newPartitionData);
    typedData->RemoveObsoleteSegments();

    SwitchReader(reader);
    MEMORY_BARRIER();

    mFileSystem->SwitchLoadSpeedLimit(true);
    mLoadedIncVersion = mPartitionDataHolder.Get()->GetOnDiskVersion();

    InitResourceCleaner();
    CleanPartitionReaders();
    CleanResource();
    ReportMetrics();

    if (!mReaderUpdater) {
        mReaderUpdater.reset(new SearchReaderUpdater(mSchema->GetSchemaName()));
    }

    if (!PrepareIntervalTask()) {
        return OS_FAIL;
    }
    mClosed = false;
    mIsServiceNormal = true;
    IE_PREFIX_LOG(INFO, "Open Index Partition End");
    return OS_OK;
}