aios/suez/common/TableMeta.h (472 lines of code) (raw):

/* * Copyright 2014-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <algorithm> #include <cstdio> #include <iosfwd> #include <iterator> #include <map> #include <memory> #include <set> #include <stdint.h> #include <string> #include <utility> #include <vector> #include "autil/Lock.h" #include "autil/legacy/jsonizable.h" #include "indexlib/indexlib.h" #include "suez/common/InnerDef.h" #include "suez/sdk/PartitionId.h" #include "suez/sdk/SuezError.h" #include "suez/sdk/SuezPartitionType.h" #include "suez/sdk/TableDefConfig.h" #include "suez/sdk/TableUserDefinedMeta.h" namespace suez { typedef schemaid_t SchemaVersion; // tableIndexPath/TableA/generation_0/partition_0_1/version.0 enum TableMode { TM_NORMAL, TM_READ_WRITE, }; struct TableMeta : public autil::legacy::Jsonizable { public: TableMeta() : mode(TM_NORMAL) , tableType(SPT_NONE) , rtStatus(TRS_BUILDING) , forceOnline(false) , timestampToSkip(-1) , totalPartitionCount(-1) {} ~TableMeta() {} public: void Jsonize(autil::legacy::Jsonizable::JsonWrapper &json) { json.Jsonize("table_mode", mode, mode); json.Jsonize("table_type", tableType, tableType); json.Jsonize("rt_status", rtStatus, rtStatus); // in c/s separate mode, offline dfs root, for deploy json.Jsonize("raw_index_root", rawIndexRoot, rawIndexRoot); json.Jsonize("force_online", forceOnline, forceOnline); json.Jsonize("timestamp_to_skip", timestampToSkip, timestampToSkip); json.Jsonize("total_partition_count", totalPartitionCount, totalPartitionCount); json.Jsonize("user_defined_param", userDefinedParam, userDefinedParam); // 兼容target json.Jsonize("index_root", indexRoot, indexRoot); json.Jsonize("config_path", configPath, configPath); } static bool needUpdateRt(const TableMeta &target, const TableMeta &current) { return target.rtStatus != current.rtStatus || target.timestampToSkip > current.timestampToSkip || (target.forceOnline && !current.forceOnline); } public: TableMode mode; SuezPartitionType tableType; TableRtStatus rtStatus; std::string rawIndexRoot; bool forceOnline; int64_t timestampToSkip; int32_t totalPartitionCount; autil::legacy::json::JsonMap userDefinedParam; std::string indexRoot; std::string configPath; }; struct DeployInfo : public autil::legacy::Jsonizable { DeployInfo(DeployStatus ds = DS_UNKNOWN) : deployStatus(ds) {} void Jsonize(autil::legacy::Jsonizable::JsonWrapper &json) { json.Jsonize("deploy_status", deployStatus, deployStatus); } DeployStatus deployStatus; }; /* EffectiveFieldInfo表示表可以提供服务的字段集合 */ struct EffectiveFieldInfo : public autil::legacy::Jsonizable { public: EffectiveFieldInfo() {} public: void Jsonize(autil::legacy::Jsonizable::JsonWrapper &json) { json.Jsonize("attributes", attributes, attributes); json.Jsonize("indexes", indexes, indexes); } public: void clear() { if (!attributes.empty()) { attributes.clear(); } if (!indexes.empty()) { indexes.clear(); } } void sort() { std::sort(attributes.begin(), attributes.end()); std::sort(indexes.begin(), indexes.end()); } bool contains(const EffectiveFieldInfo &target, const PartitionId &pid) const; public: std::vector<std::string> attributes; std::vector<std::string> indexes; }; struct DeployMeta { public: DeployMeta() {} ~DeployMeta() {} public: DeployMeta(const DeployMeta &other) { *this = other; } DeployMeta &operator=(const DeployMeta &other); public: bool hasDeployingIncVersion() const { autil::ScopedLock lock(_mutex); for (const auto &kv : deployStatusMap) { if (kv.second.deployStatus == DS_DEPLOYING) { return true; } } return false; } void setDeployStatus(IncVersion incVersion, DeployStatus status) { autil::ScopedLock lock(_mutex); deployStatusMap[incVersion].deployStatus = status; } IncVersion getLatestDeployVersion() const; DeployStatus getLatestDeployStatus() const; DeployStatus getDeployStatus(IncVersion incVersion) const { autil::ScopedLock lock(_mutex); auto iter = deployStatusMap.find(incVersion); if (iter == deployStatusMap.end()) { return DS_UNKNOWN; } return iter->second.deployStatus; } std::set<IncVersion> getNeedKeepVersions(IncVersion incVersion) const { std::set<IncVersion> versions; autil::ScopedLock lock(_mutex); versions.insert(incVersion); for (const auto &it : deployStatusMap) { if (it.first >= incVersion) { versions.insert(it.first); } } return versions; } void clearIncVersion(const std::set<IncVersion> &inUseVersions); std::string getConfigPath() const { autil::ScopedLock lock(_mutex); return configPath; } void setConfigPath(const std::string &configPath_) { autil::ScopedLock lock(_mutex); configPath = configPath_; } void setIndexRoot(const std::string &indexRoot_) { autil::ScopedLock lock(_mutex); indexRoot = indexRoot_; } std::string getIndexRoot() const { autil::ScopedLock lock(_mutex); return indexRoot; } void setRawIndexRoot(const std::string &rawIndexRoot_) { rawIndexRoot = rawIndexRoot_; } std::string getRawIndexRoot() const { autil::ScopedLock lock(_mutex); if (!rawIndexRoot.empty()) { return rawIndexRoot; } return indexRoot; } std::map<IncVersion, DeployInfo> getDeployStatusMap() { autil::ScopedLock lock(_mutex); return deployStatusMap; } private: mutable autil::ThreadMutex _mutex; std::map<IncVersion, DeployInfo> deployStatusMap; std::string configPath; std::string indexRoot; std::string rawIndexRoot; }; struct PartitionMeta : public autil::legacy::Jsonizable { public: PartitionMeta() : incVersion(indexlib::INVALID_VERSIONID) , schemaVersion(0) , rollbackTimestamp(0) , branchId(0) , keepCount(1) , configKeepCount(1) , tableLoadType(TLT_TRY_LOAD_FIRST) , tableStatus(TS_UNKNOWN) , roleType(RT_FOLLOWER) , fullIndexLoaded(false) { deployMeta = std::make_shared<DeployMeta>(); } ~PartitionMeta() {} public: PartitionMeta(const PartitionMeta &other) { *this = other; } PartitionMeta &operator=(const PartitionMeta &other); void Jsonize(autil::legacy::Jsonizable::JsonWrapper &json); bool validate() const; void setRtStatus(TableRtStatus status) { autil::ScopedLock lock(_mutex); tableMeta.rtStatus = status; } void setForceOnline(bool online) { autil::ScopedLock lock(_mutex); tableMeta.forceOnline = online; } bool getForceOnline() const { autil::ScopedLock lock(_mutex); return tableMeta.forceOnline; } void setTimestampToSkip(int64_t timestamp) { autil::ScopedLock lock(_mutex); tableMeta.timestampToSkip = timestamp; } int64_t getTimestampToSkip() const { autil::ScopedLock lock(_mutex); return tableMeta.timestampToSkip; } TableRtStatus getRtStatus() const { autil::ScopedLock lock(_mutex); return tableMeta.rtStatus; } void setTableStatus(TableStatus status) { autil::ScopedLock lock(_mutex); tableStatus = status; } TableStatus getTableStatus() const { autil::ScopedLock lock(_mutex); return tableStatus; } bool hasDeployingIncVersion() const { return deployMeta->hasDeployingIncVersion(); } void setDeployStatus(IncVersion incVersion, DeployStatus status) { deployMeta->setDeployStatus(incVersion, status); } IncVersion getLatestDeployVersion() const { return deployMeta->getLatestDeployVersion(); } DeployStatus getLatestDeployStatus() const { return deployMeta->getLatestDeployStatus(); } DeployStatus getDeployStatus(IncVersion incVersion) const { return deployMeta->getDeployStatus(incVersion); } std::set<IncVersion> getNeedKeepVersions() const { return deployMeta->getNeedKeepVersions(incVersion); } IncVersion getIncVersion() const { autil::ScopedLock lock(_mutex); return incVersion; } SchemaVersion getSchemaVersion() const { autil::ScopedLock lock(_mutex); return schemaVersion; } EffectiveFieldInfo getEffectiveFieldInfo() const { autil::ScopedLock lock(_mutex); return effectiveFieldInfo; } void setIncVersion(IncVersion incVersionId, SchemaVersion schemaVersionId = 0) { autil::ScopedLock lock(_mutex); incVersion = incVersionId; schemaVersion = schemaVersionId; } void setSchemaContent(std::string content) { autil::ScopedLock lock(_mutex); schemaContent = std::move(content); } void setRollbackTimestamp(int64_t rollbackTimestamp_) { autil::ScopedLock lock(_mutex); rollbackTimestamp = rollbackTimestamp_; } int64_t getRollbackTimestamp() const { autil::ScopedLock lock(_mutex); return rollbackTimestamp; } void setBranchId(uint64_t branchId_) { autil::ScopedLock lock(_mutex); branchId = branchId_; } uint64_t getBranchId() const { autil::ScopedLock lock(_mutex); return branchId; } const std::string &getSchemaContent() const { autil::ScopedLock lock(_mutex); return schemaContent; } void setEffectiveFieldInfo(const EffectiveFieldInfo &info) { autil::ScopedLock lock(_mutex); effectiveFieldInfo = info; } void clearEffectiveFieldInfo() { autil::ScopedLock lock(_mutex); effectiveFieldInfo.clear(); } size_t getKeepCount() const { autil::ScopedLock lock(_mutex); return keepCount; } void setKeepCount(size_t keepCount_) { autil::ScopedLock lock(_mutex); keepCount = keepCount_; } size_t getConfigKeepCount() const { autil::ScopedLock lock(_mutex); return configKeepCount; } void setConfigKeepCount(size_t configKeepCount_) { autil::ScopedLock lock(_mutex); configKeepCount = configKeepCount_; } TableLoadType getTableLoadType() const { autil::ScopedLock lock(_mutex); return tableLoadType; } void setTableLoadType(TableLoadType tableLoadType_) { autil::ScopedLock lock(_mutex); tableLoadType = tableLoadType_; } std::string getConfigPath() const { return deployMeta->getConfigPath(); } void setConfigPath(const std::string &configPath_) { deployMeta->setConfigPath(configPath_); } void setIndexRoot(const std::string &indexRoot_) { deployMeta->setIndexRoot(indexRoot_); } std::string getIndexRoot() const { return deployMeta->getIndexRoot(); } void setRawIndexRoot(const std::string &rawIndexRoot_) { deployMeta->setRawIndexRoot(rawIndexRoot_); } std::string getRawIndexRoot() const { return deployMeta->getRawIndexRoot(); } void setTotalPartitionCount(int32_t count) { autil::ScopedLock lock(_mutex); tableMeta.totalPartitionCount = count; } int32_t getTotalPartitionCount() const { autil::ScopedLock lock(_mutex); return tableMeta.totalPartitionCount; } void setTableMode(TableMode mode) { autil::ScopedLock lock(_mutex); tableMeta.mode = mode; } TableMode getTableMode() const { autil::ScopedLock lock(_mutex); return tableMeta.mode; } SuezPartitionType getTableType() const { autil::ScopedLock lock(_mutex); return tableMeta.tableType; } void setTableType(SuezPartitionType tableType) { autil::ScopedLock lock(_mutex); tableMeta.tableType = tableType; } void setTableMeta(const TableMeta &tableMeta) { autil::ScopedLock lock(_mutex); this->tableMeta = tableMeta; } const TableMeta &getTableMeta() const { autil::ScopedLock lock(_mutex); return tableMeta; } void setError(const SuezError &err) { autil::ScopedLock lock(_mutex); error = err; } void resetError(const SuezError &err) { autil::ScopedLock lock(_mutex); if (error == err) { error = ERROR_NONE; } } const SuezError &getError() const { autil::ScopedLock lock(_mutex); return error; } void setLoadedConfigPath(const std::string &loadedConfigPath_) { autil::ScopedLock lock(_mutex); loadedConfigPath = loadedConfigPath_; } const std::string &getLoadedConfigPath() const { autil::ScopedLock lock(_mutex); return loadedConfigPath; } void setLoadedIndexRoot(const std::string &loadedIndexRoot_) { autil::ScopedLock lock(_mutex); loadedIndexRoot = loadedIndexRoot_; } const std::string &getLoadedIndexRoot() const { autil::ScopedLock lock(_mutex); return loadedIndexRoot; } void setFullIndexLoaded(bool loaded) { autil::ScopedLock lock(_mutex); fullIndexLoaded = loaded; } bool getFullIndexLoaded() const { autil::ScopedLock lock(_mutex); return fullIndexLoaded; } void setRoleType(RoleType type) { autil::ScopedLock lock(_mutex); roleType = type; } void setTableDefConfig(const TableDefConfig &config) { autil::ScopedLock lock(_mutex); tableDefConfig = config; tableDefConfig.tableDist.partitionCnt = tableMeta.totalPartitionCount; } const TableDefConfig &getTableDefConfig() const { autil::ScopedLock lock(_mutex); return tableDefConfig; } RoleType getRoleType() const { autil::ScopedLock lock(_mutex); return roleType; } void clearIncVersion(const std::set<IncVersion> &inUseVersions); bool isTableInMemory() const { auto status = getTableStatus(); return status == TS_LOADING || status == TS_PRELOADING || status == TS_UNLOADING || status == TS_FORCELOADING || status == TS_LOADED; } const std::string &getCheckIndexPath() const { autil::ScopedLock lock(_mutex); return checkIndexPath; } /** * indicating table reaches target. */ bool isReady(bool allowForceLoad) const { autil::ScopedLock lock(_mutex); if (tableMeta.tableType != SPT_INDEXLIB) { return (tableStatus == TS_LOADED); } if (!fullIndexLoaded) { return false; } return ((tableStatus == TS_LOADED && (allowForceLoad ? loadedConfigPath == getConfigPath() : true)) || tableStatus == TS_PRELOADING || tableStatus == TS_PRELOAD_FAILED || tableStatus == TS_PRELOAD_FORCE_RELOAD); } /** * indicating table is serving, IndexProvider can be gotten. * for index partition ONLY. */ bool isAvailable(bool allowForceLoad, const PartitionMeta &target, const PartitionId &pid) const; private: mutable autil::ThreadMutex _mutex; TableMeta tableMeta; std::shared_ptr<DeployMeta> deployMeta; // dp状态共享 避免role switch的时候 dp状态不同步 IncVersion incVersion; // loaded incVersion SchemaVersion schemaVersion; int64_t rollbackTimestamp; uint64_t branchId; // indexlibv2回滚使用,给一个不同的branchId后会根据rollbackTimestamp和incVersion来决定回滚到那个版本 EffectiveFieldInfo effectiveFieldInfo; size_t keepCount; size_t configKeepCount; TableLoadType tableLoadType; // 存储计算分离场景使用。在线需要等索引分发到在线盘古上version写出来。checkIndexPath就是分发完成的标记路径。 std::string checkIndexPath; std::string loadedConfigPath; std::string loadedIndexRoot; std::string schemaContent; TableStatus tableStatus; SuezError error; RoleType roleType; bool fullIndexLoaded; // only for SuezIndexPartition TableDefConfig tableDefConfig; }; typedef std::shared_ptr<PartitionMeta> PartitionMetaPtr; typedef PartitionMeta TargetPartitionMeta; typedef PartitionMeta CurrentPartitionMeta; typedef PartitionMetaPtr TargetPartitionMetaPtr; typedef PartitionMetaPtr CurrentPartitionMetaPtr; bool operator==(const DeployInfo &left, const DeployInfo &right); bool operator!=(const DeployInfo &left, const DeployInfo &right); bool operator==(const TableMeta &left, const TableMeta &right); bool operator!=(const TableMeta &left, const TableMeta &right); bool operator==(const PartitionMeta &left, const PartitionMeta &right); bool operator!=(const PartitionMeta &left, const PartitionMeta &right); class TableMetas : public std::map<PartitionId, PartitionMeta>, public autil::legacy::Jsonizable { public: void Jsonize(autil::legacy::Jsonizable::JsonWrapper &json); void GetTableUserDefinedMetas(TableUserDefinedMetas &userDefinedMetas) const; }; // for gtest ::std::ostream &operator<<(::std::ostream &os, const TableId &tableId); ::std::ostream &operator<<(::std::ostream &os, const TableMeta &tableMeta); ::std::ostream &operator<<(::std::ostream &os, const PartitionId &partitionId); ::std::ostream &operator<<(::std::ostream &os, const PartitionMeta &partitionMeta); ::std::ostream &operator<<(::std::ostream &os, const TableMetas &tableMetas); ::std::ostream &operator<<(::std::ostream &os, const DeployInfo &deployInfo); } // namespace suez