aios/apps/facility/swift/admin/AdminZkDataAccessor.cpp (1,054 lines of code) (raw):

/* * Copyright 2014-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "swift/admin/AdminZkDataAccessor.h" #include <algorithm> #include <assert.h> #include <limits> #include <memory> #include <sstream> #include <stddef.h> #include <utility> #include "aios/apps/facility/cm2/cm_basic/util/zk_wrapper.h" #include "aios/network/http_arpc/ProtoJsonizer.h" #include "autil/StringUtil.h" #include "autil/TimeUtility.h" #include "fslib/fs/FileSystem.h" #include "swift/common/PathDefine.h" #include "swift/protocol/AdminRequestResponse.pb.h" #include "swift/protocol/Common.pb.h" #include "swift/protocol/Heartbeat.pb.h" #include "swift/protocol/MessageComparator.h" #include "swift/util/LogicTopicHelper.h" #include "swift/util/TargetRecorder.h" #include "zookeeper/zookeeper.h" using namespace swift::protocol; using namespace swift::common; using namespace swift::util; using namespace fslib::util; using namespace http_arpc; using namespace std; using namespace autil; namespace swift { namespace admin { AUTIL_LOG_SETUP(swift, AdminZkDataAccessor); static const int LEADER_HISTORY_MAX_COUNT = 100; AdminZkDataAccessor::AdminZkDataAccessor() : _recordLocalFile(0) {} AdminZkDataAccessor::~AdminZkDataAccessor() {} bool AdminZkDataAccessor::init(cm_basic::ZkWrapper *zkWrapper, const string &sysRoot) { _zkPath = PathDefine::getPathFromZkPath(sysRoot); return ZkDataAccessor::init(zkWrapper, sysRoot) && mkZkDirs(); } bool AdminZkDataAccessor::init(const string &sysRoot) { _zkPath = PathDefine::getPathFromZkPath(sysRoot); return ZkDataAccessor::init(sysRoot); } bool AdminZkDataAccessor::mkZkDirs() { const string &adminDir = PathDefine::getAdminDir(_zkPath); if (!createPath(adminDir)) { AUTIL_LOG(ERROR, "create topic dir [%s] failed.", adminDir.c_str()); return false; } const string &topicDir = PathDefine::getTopicDir(_zkPath); if (!createPath(topicDir)) { AUTIL_LOG(ERROR, "create topic dir [%s] failed.", topicDir.c_str()); return false; } const string &taskDir = PathDefine::getTaskDir(_zkPath); if (!createPath(taskDir)) { AUTIL_LOG(ERROR, "create topic dir [%s] failed.", taskDir.c_str()); return false; } const string &heartbeatDir = PathDefine::heartbeatMonitorAddress(_zkPath); if (!createPath(heartbeatDir)) { AUTIL_LOG(ERROR, "create topic dir [%s] failed.", heartbeatDir.c_str()); return false; } const string &schemaDir = PathDefine::getSchemaDir(_zkPath); if (!createPath(schemaDir)) { AUTIL_LOG(ERROR, "create topic dir [%s] failed.", taskDir.c_str()); return false; } const string &noUseTopicDir = PathDefine::getNoUseTopicDir(_zkPath); if (!createPath(noUseTopicDir)) { AUTIL_LOG(ERROR, "create not use topic dir [%s] failed.", taskDir.c_str()); return false; } return true; } TopicPartitionInfos AdminZkDataAccessor::getTopicPartitionInfos() const { return _topicPartitionInfos; } bool AdminZkDataAccessor::loadTopicInfos(TopicMetas &topicMetas, TopicPartitionInfos &topicPartInfos) { const string &path = PathDefine::getTopicMetaFile(_zkPath); bool exist = false; if (!_zkWrapper->check(path, exist)) { AUTIL_LOG(ERROR, "failed to check whether file exists."); return false; } if (exist) { return loadTopicInfosFromNewFiles(topicMetas, topicPartInfos); } else { return loadTopicInfosFromOldFiles(topicMetas, topicPartInfos); } } bool AdminZkDataAccessor::loadTopicInfosFromNewFiles(TopicMetas &topicMetas, TopicPartitionInfos &topicPartInfos) { if (!readTopicMetas(topicMetas)) { AUTIL_LOG(ERROR, "read topic metas from zk failed."); return false; } if (!readTopicPartitionInfos(topicPartInfos)) { AUTIL_LOG(WARN, "read partition infos from zk failed, need recover."); } else { _topicPartitionInfos = topicPartInfos; } _topicMetas = topicMetas; return true; } bool AdminZkDataAccessor::recoverPartitionInfos(const TopicMetas &topicMetas, TopicPartitionInfos &partitionInfos) { partitionInfos.clear_topicpartitioninfos(); for (int32_t i = 0; i < topicMetas.topicmetas_size(); i++) { const TopicCreationRequest &topicMeta = topicMetas.topicmetas(i); TopicPartitionInfo *topicPartitionInfo = partitionInfos.add_topicpartitioninfos(); topicPartitionInfo->set_topicname(topicMeta.topicname()); for (uint32_t i = 0; i < topicMeta.partitioncount(); ++i) { PartitionInfo *pi = topicPartitionInfo->add_partitioninfos(); pi->set_id(i); pi->set_brokeraddress(""); pi->set_status(PARTITION_STATUS_NONE); } } if (!writeTopicPartitionInfos(partitionInfos)) { AUTIL_LOG(ERROR, "write topic partition infos to zk failed."); return false; } _topicPartitionInfos = partitionInfos; return true; } bool AdminZkDataAccessor::addTopic(const TopicCreationRequest &topicMeta) { TopicBatchCreationRequest metas; *metas.add_topicrequests() = topicMeta; return addTopics(metas); } void AdminZkDataAccessor::addTopicToMetas(const TopicCreationRequest &request, TopicMetas &metas, TopicPartitionInfos &partInfos) { TopicPartitionInfo topicPartitionInfo; topicPartitionInfo.set_topicname(request.topicname()); PartitionStatus status = (TOPIC_TYPE_LOGIC == request.topictype()) ? PARTITION_STATUS_RUNNING : PARTITION_STATUS_NONE; for (uint32_t i = 0; i < request.partitioncount(); ++i) { PartitionInfo *pi = topicPartitionInfo.add_partitioninfos(); pi->set_id(i); pi->set_brokeraddress(""); pi->set_status(status); } *metas.add_topicmetas() = request; *partInfos.add_topicpartitioninfos() = topicPartitionInfo; } bool AdminZkDataAccessor::addTopics(const TopicBatchCreationRequest &batchTopicMetas) { TopicMetas topicMetas = _topicMetas; TopicPartitionInfos topicPartitionInfos = _topicPartitionInfos; string logTopicNames; for (int idx = 0; idx < batchTopicMetas.topicrequests_size(); ++idx) { auto &topicMeta = batchTopicMetas.topicrequests(idx); logTopicNames += topicMeta.topicname() + ","; addTopicToMetas(topicMeta, topicMetas, topicPartitionInfos); } if (!writeTopicMetas(topicMetas)) { AUTIL_LOG(ERROR, "write topic metas to zk failed."); return false; } if (!writeTopicPartitionInfos(topicPartitionInfos)) { AUTIL_LOG(ERROR, "write topic partition infos to zk failed."); } _topicMetas = topicMetas; _topicPartitionInfos = topicPartitionInfos; AUTIL_LOG(INFO, "add topic name [%s]", logTopicNames.c_str()); return true; } //只有L和PL的topic会需要调用此接口 bool AdminZkDataAccessor::addPhysicTopic(const TopicCreationRequest &request, int64_t brokerCfgTTlSec, TopicCreationRequest &retLogicTopicMeta, TopicCreationRequest &retLastPhysicTopicMeta, TopicCreationRequest &retNewPhysicTopicMeta) { if (TOPIC_TYPE_LOGIC != request.topictype() && TOPIC_TYPE_LOGIC_PHYSIC != request.topictype()) { AUTIL_LOG(ERROR, "topic type[%s] cannot add physic topic, request[%s]", TopicType_Name(request.topictype()).c_str(), request.ShortDebugString().c_str()); return false; } TopicMetas topicMetas = _topicMetas; TopicPartitionInfos topicPartitionInfos = _topicPartitionInfos; int64_t curTime = TimeUtility::currentTime(); // PL topic的part count代表第一个物理topic的属性, 不能修改 const string &newPhysicTopicName = LogicTopicHelper::genPhysicTopicName(request.topicname().c_str(), curTime, request.partitioncount()); // find logic and modify logic topic bool found = false; string lastPhyTopicName; for (int idx = 0; idx < topicMetas.topicmetas_size(); ++idx) { TopicCreationRequest *meta = topicMetas.mutable_topicmetas(idx); if (request.topicname() == meta->topicname()) { // last physic topic int phyLen = meta->physictopiclst_size(); lastPhyTopicName = (phyLen > 0) ? meta->physictopiclst(phyLen - 1) : meta->topicname(); // add physic topic *meta->add_physictopiclst() = newPhysicTopicName; meta->set_modifytime(curTime); if (TOPIC_TYPE_LOGIC == meta->topictype()) { meta->set_partitioncount(request.partitioncount()); auto *logicPinfo = topicPartitionInfos.mutable_topicpartitioninfos(idx); if (logicPinfo->topicname() == meta->topicname()) { logicPinfo->clear_partitioninfos(); for (uint32_t i = 0; i < request.partitioncount(); ++i) { PartitionInfo *pi = logicPinfo->add_partitioninfos(); pi->set_id(i); pi->set_status(PARTITION_STATUS_RUNNING); } } else { AUTIL_LOG(WARN, "zk id[%d]'s meta[%s] and partinfo[%s] topic dismatch", idx, meta->topicname().c_str(), logicPinfo->topicname().c_str()); } } TopicCreationRequest physicMeta = request; physicMeta.clear_physictopiclst(); physicMeta.set_topicname(newPhysicTopicName); physicMeta.set_topictype(TOPIC_TYPE_PHYSIC); physicMeta.set_createtime(curTime); physicMeta.set_modifytime(curTime); physicMeta.set_topicexpiredtime(-1); physicMeta.set_enablettldel(false); physicMeta.set_sealed(false); addTopicToMetas(physicMeta, topicMetas, topicPartitionInfos); // set return value retLogicTopicMeta = *meta; retNewPhysicTopicMeta = physicMeta; found = true; break; } } if (!found) { AUTIL_LOG(ERROR, "not found logic topic[%s], add physic topic fail", request.topicname().c_str()); return false; } // 上一个物理topic的expire time设上 for (int idx = 0; idx < topicMetas.topicmetas_size(); ++idx) { TopicCreationRequest *meta = topicMetas.mutable_topicmetas(idx); if (lastPhyTopicName == meta->topicname()) { int64_t topicExpiredTimeSec = LogicTopicHelper::getPhysicTopicExpiredTime(curTime, meta->obsoletefiletimeinterval(), brokerCfgTTlSec); meta->set_modifytime(curTime); meta->set_topicexpiredtime(topicExpiredTimeSec); retLastPhysicTopicMeta = *meta; } } if (!writeTopicMetas(topicMetas)) { AUTIL_LOG(ERROR, "write topic metas to zk failed"); return false; } if (!writeTopicPartitionInfos(topicPartitionInfos)) { AUTIL_LOG(ERROR, "write topic partition infos to zk failed."); } _topicMetas = topicMetas; _topicPartitionInfos = topicPartitionInfos; AUTIL_LOG(INFO, "add physic topic[%s] success", newPhysicTopicName.c_str()); return true; } bool AdminZkDataAccessor::modifyTopic(const TopicCreationRequest &newMeta) { TopicMetas topicMetas = _topicMetas; TopicPartitionInfos topicPartitionInfos = _topicPartitionInfos; const string &topicName = newMeta.topicname(); bool found = false; for (int i = 0; i < topicMetas.topicmetas_size(); i++) { protocol::TopicCreationRequest *topicMetaPtr = topicMetas.mutable_topicmetas(i); if (topicMetaPtr->topicname() == topicName) { *topicMetaPtr = newMeta; found = true; break; } } if (!found) { AUTIL_LOG(ERROR, "topic[%s] does not exist", topicName.c_str()); return false; } bool needUpdatePartitonInfo = false; for (int i = 0; i < topicPartitionInfos.topicpartitioninfos_size(); i++) { TopicPartitionInfo *partInfo = topicPartitionInfos.mutable_topicpartitioninfos(i); if (partInfo->topicname() == topicName && partInfo->partitioninfos_size() != (int32_t)newMeta.partitioncount()) { needUpdatePartitonInfo = true; partInfo->Clear(); partInfo->set_topicname(topicName); for (uint32_t k = 0; k < newMeta.partitioncount(); ++k) { PartitionInfo *pi = partInfo->add_partitioninfos(); pi->set_id(k); pi->set_brokeraddress(""); pi->set_status(PARTITION_STATUS_NONE); } break; } } if (!writeTopicMetas(topicMetas)) { AUTIL_LOG(ERROR, "write topic metas to zk failed."); return false; } if (needUpdatePartitonInfo && !writeTopicPartitionInfos(topicPartitionInfos)) { AUTIL_LOG(ERROR, "write topic partition infos to zk failed."); } _topicMetas = topicMetas; if (needUpdatePartitonInfo) { _topicPartitionInfos = topicPartitionInfos; } AUTIL_LOG(INFO, "modify topic name [%s]", newMeta.topicname().c_str()); return true; } void AdminZkDataAccessor::getUpdateAndDelTopics(const map<string, TopicCreationRequest> &todelTopicMetas, map<string, TopicCreationRequest> &updateTopicMetas, set<string> &deleteTopics) { for (auto &item : todelTopicMetas) { const TopicCreationRequest &meta = item.second; if (TOPIC_TYPE_NORMAL == meta.topictype()) { deleteTopics.insert(meta.topicname()); } else if (TOPIC_TYPE_LOGIC == meta.topictype()) { // delete logic and all physic topics deleteTopics.insert(meta.topicname()); for (int i = 0; i < meta.physictopiclst_size(); ++i) { deleteTopics.insert(meta.physictopiclst(i)); } } else if (TOPIC_TYPE_LOGIC_PHYSIC == meta.topictype()) { // only delete first itself physic topic if (0 == meta.physictopiclst_size()) { AUTIL_LOG(ERROR, "LP topic[%s] delete P will have no physic topic, " "not delete", meta.topicname().c_str()); continue; } // update oldest physic topic's enableTTLDel TopicCreationRequest physicMeta; if (!getTopicMeta(meta.physictopiclst(0), physicMeta)) { AUTIL_LOG(ERROR, "LP topic[%s] cannot find physic[%s] meta", meta.ShortDebugString().c_str(), meta.physictopiclst(0).c_str()); continue; } TopicCreationRequest lastMeta; if (!getTopicMeta(meta.physictopiclst(meta.physictopiclst_size() - 1), lastMeta)) { AUTIL_LOG(ERROR, "LP topic[%s] get last physic meta failed", meta.ShortDebugString().c_str()); continue; } physicMeta.set_enablettldel(true); updateTopicMetas[physicMeta.topicname()] = physicMeta; // update logic topic's topic type updateTopicMetas[meta.topicname()] = meta; TopicCreationRequest &logicMeta = updateTopicMetas[meta.topicname()]; logicMeta.set_partitioncount(lastMeta.partitioncount()); logicMeta.set_topictype(TOPIC_TYPE_LOGIC); logicMeta.set_topicexpiredtime(-1); logicMeta.set_sealed(false); } else if (TOPIC_TYPE_PHYSIC == meta.topictype()) { // delete physic topic and update its logic topic string logicName; TopicCreationRequest logicMeta; if (!LogicTopicHelper::getLogicTopicName(meta.topicname(), logicName) || !getTopicMeta(logicName, logicMeta)) { AUTIL_LOG(WARN, "physic[%s] cannot find logic topic", meta.topicname().c_str()); continue; } if (TOPIC_TYPE_LOGIC_PHYSIC == logicMeta.topictype() || logicMeta.physictopiclst(0) != meta.topicname()) { AUTIL_LOG(ERROR, "cannot delete middle physic topic[%s], meta[%s]", meta.topicname().c_str(), logicMeta.ShortDebugString().c_str()); continue; } if (1 == logicMeta.physictopiclst_size()) { AUTIL_LOG(ERROR, "cannot delete one only physic topic[%s] from[%s]", meta.topicname().c_str(), logicMeta.ShortDebugString().c_str()); continue; } TopicCreationRequest nextPhysicMeta; if (!getTopicMeta(logicMeta.physictopiclst(1), nextPhysicMeta)) { AUTIL_LOG(ERROR, "logic[%s]'s physic[%s] cannot get next physic[%s] meta", logicMeta.ShortDebugString().c_str(), meta.topicname().c_str(), logicMeta.physictopiclst(1).c_str()); continue; } nextPhysicMeta.set_enablettldel(true); updateTopicMetas[nextPhysicMeta.topicname()] = nextPhysicMeta; // delete physic topic deleteTopics.insert(meta.topicname()); // update logic topic's physicTopicList updateTopicMetas[logicName] = logicMeta; TopicCreationRequest &updateMeta = updateTopicMetas[logicName]; updateMeta.mutable_physictopiclst()->Clear(); for (int i = 1; i < logicMeta.physictopiclst_size(); ++i) { *updateMeta.add_physictopiclst() = logicMeta.physictopiclst(i); } } } } bool AdminZkDataAccessor::deleteTopicsAllType(const map<string, TopicCreationRequest> &todelTopicMetas, set<string> &deletedTopics, vector<TopicCreationRequest> &updatedTopicMetas) { map<string, TopicCreationRequest> toUpdateMetas; set<string> toDeleteTopics; getUpdateAndDelTopics(todelTopicMetas, toUpdateMetas, toDeleteTopics); return deleteTopic(toDeleteTopics, toUpdateMetas, deletedTopics, updatedTopicMetas); } bool AdminZkDataAccessor::deleteTopic(const set<string> &topicNames) { map<string, TopicCreationRequest> toUpdateMetas; set<string> deletedTopics; vector<TopicCreationRequest> updatedMetas; return deleteTopic(topicNames, toUpdateMetas, deletedTopics, updatedMetas); } bool AdminZkDataAccessor::deleteTopic(const set<string> &toDeleteTopics, const map<string, TopicCreationRequest> &toUpdateMetas, set<string> &deletedTopics, vector<TopicCreationRequest> &updatedMetas) { TopicMetas topicMetas; TopicPartitionInfos topicPartitionInfos; assert(_topicMetas.topicmetas_size() == _topicPartitionInfos.topicpartitioninfos_size()); bool found = false; for (int i = 0; i < _topicMetas.topicmetas_size(); i++) { const TopicCreationRequest &meta = _topicMetas.topicmetas(i); const TopicPartitionInfo &partInfo = _topicPartitionInfos.topicpartitioninfos(i); if (toDeleteTopics.count(meta.topicname()) > 0) { found = true; deletedTopics.insert(meta.topicname()); continue; } const auto iter = toUpdateMetas.find(meta.topicname()); if (toUpdateMetas.end() != iter) { found = true; *topicMetas.add_topicmetas() = iter->second; updatedMetas.emplace_back(iter->second); TopicPartitionInfo *newPartInfo = topicPartitionInfos.add_topicpartitioninfos(); newPartInfo->set_topicname(iter->second.topicname()); if (TOPIC_TYPE_LOGIC_PHYSIC == meta.topictype() && TOPIC_TYPE_LOGIC == iter->second.topictype()) { for (uint32_t i = 0; i < iter->second.partitioncount(); ++i) { PartitionInfo *pi = newPartInfo->add_partitioninfos(); pi->set_id(i); pi->set_status(PARTITION_STATUS_RUNNING); } } else { *newPartInfo = partInfo; } } else { *topicMetas.add_topicmetas() = meta; *topicPartitionInfos.add_topicpartitioninfos() = partInfo; } } if (!found) { AUTIL_LOG(ERROR, "topics [%s] does not exist", StringUtil::toString(toDeleteTopics.begin(), toDeleteTopics.end(), ",").c_str()); return false; } if (!writeTopicMetas(topicMetas)) { AUTIL_LOG(ERROR, "write topic metas to zk failed."); return false; } if (!writeTopicPartitionInfos(topicPartitionInfos)) { AUTIL_LOG(ERROR, "write topic partition infos to zk failed."); } string deletedTopicsStr; for (const auto &topicName : deletedTopics) { removeTopicSchema(topicName); deletedTopicsStr += topicName + ","; } _topicMetas = topicMetas; _topicPartitionInfos = topicPartitionInfos; AUTIL_LOG(INFO, "delete topic name[%s]", deletedTopicsStr.c_str()); return true; } bool AdminZkDataAccessor::setTopicPartitionInfos(const TopicPartitionInfos &topicPartInfos) { if (!writeTopicPartitionInfos(topicPartInfos)) { AUTIL_LOG(ERROR, "write topic partition infos to zk failed."); return false; } _topicPartitionInfos = topicPartInfos; return true; } bool AdminZkDataAccessor::writeTopicMetas(const TopicMetas &topicMetas) { static int64_t writeCount = 0; writeCount++; string topicMetaPath = PathDefine::getTopicMetaFile(_zkPath); if (_recordLocalFile > 0 && writeCount % _recordLocalFile == 0) { string content; topicMetas.SerializeToString(&content); TargetRecorder::newAdminTopic(content); } return writeZk(topicMetaPath, topicMetas, true); } void AdminZkDataAccessor::setTopicMetas(const TopicMetas &topicMetas) { _topicMetas = topicMetas; } bool AdminZkDataAccessor::writeTopicPartitionInfos(const TopicPartitionInfos &topicPartitionInfos) { static int64_t writeCount = 0; writeCount++; string topicPartitionInfoPath = PathDefine::getPartitionInfoFile(_zkPath); if (_recordLocalFile > 0 && writeCount % _recordLocalFile == 0) { string content; topicPartitionInfos.SerializeToString(&content); TargetRecorder::newAdminPartition(content); } return writeZk(topicPartitionInfoPath, topicPartitionInfos, true); } bool AdminZkDataAccessor::readTopicMetas(TopicMetas &topicMetas) { string path = PathDefine::getTopicMetaFile(_zkPath); return readZk(path, topicMetas, true); } bool AdminZkDataAccessor::readTopicPartitionInfos(TopicPartitionInfos &topicPartitionInfos) { string path = PathDefine::getPartitionInfoFile(_zkPath); return readZk(path, topicPartitionInfos, true); } bool AdminZkDataAccessor::getTopicMeta(const string &topicName, TopicCreationRequest &topicMeta) { bool found = false; for (int i = 0; i < _topicMetas.topicmetas_size(); i++) { const TopicCreationRequest &meta = _topicMetas.topicmetas(i); if (meta.topicname() == topicName) { found = true; topicMeta = meta; break; } } if (!found) { AUTIL_LOG(ERROR, "topic[%s] does not exist", topicName.c_str()); return false; } return true; } bool AdminZkDataAccessor::loadTopicInfosFromOldFiles(TopicMetas &topicMetas, TopicPartitionInfos &topicPartInfos) { string topicDir = PathDefine::getTopicDir(_zkPath); bool exist = false; if (!_zkWrapper->check(topicDir, exist)) { AUTIL_LOG(ERROR, "fail to check topic dir[%s]", topicDir.c_str()); return false; } if (!exist) { AUTIL_LOG(INFO, "topic dir[%s] not exist", topicDir.c_str()); return true; } vector<string> topics; if (!getAllTopicNameOld(topics)) { AUTIL_LOG(ERROR, "fail to get topic names"); return false; } sort(topics.begin(), topics.end()); for (size_t i = 0; i < topics.size(); i++) { TopicCreationRequest topicMeta; TopicPartitionInfo topicPartitionInfo; if (!getTopicMetaOld(topics[i], topicMeta)) { AUTIL_LOG(ERROR, "fail to get topic meta[%s]", topics[i].c_str()); return false; } TopicInfo topicInfo; if (!getTopicInfoOld(topics[i], topicInfo)) { AUTIL_LOG(ERROR, "fail to get topic info[%s]", topics[i].c_str()); return false; } for (int pid = 0; pid < topicInfo.partitioninfos_size(); pid++) { *topicPartitionInfo.add_partitioninfos() = topicInfo.partitioninfos(pid); } topicPartitionInfo.set_topicname(topics[i]); *topicMetas.add_topicmetas() = topicMeta; *topicPartInfos.add_topicpartitioninfos() = topicPartitionInfo; } if (topicMetas.topicmetas_size() > 0) { if (!writeTopicMetas(topicMetas)) { AUTIL_LOG(ERROR, "write topic metas to zk failed."); return false; } if (!writeTopicPartitionInfos(topicPartInfos)) { AUTIL_LOG(ERROR, "write topic partition infos to zk failed."); return false; } } if (!_zkWrapper->remove(topicDir)) { AUTIL_LOG(WARN, "erase old topics dir on zk failed"); } _topicMetas = topicMetas; _topicPartitionInfos = topicPartInfos; return true; } bool AdminZkDataAccessor::getTopicMetaOld(const string &topicName, TopicCreationRequest &meta) const { string path = PathDefine::getTopicMetaPath(_zkPath, topicName); return readZk(path, meta); } bool AdminZkDataAccessor::getTopicInfoOld(const string &topicName, TopicInfo &topicInfo) const { string path = PathDefine::getTopicPartitionDir(_zkPath, topicName); vector<string> partitions; ZOO_ERRORS ec = _zkWrapper->getChild(path, partitions); if (ZOK != ec) { AUTIL_LOG(ERROR, "Failed to list dir %s.", path.c_str()); return false; } topicInfo.set_name(topicName); topicInfo.set_status(TOPIC_STATUS_NONE); for (size_t i = 0; i < partitions.size(); i++) { PartitionInfo *pi = topicInfo.add_partitioninfos(); int pid = StringUtil::fromString<uint32_t>(partitions[i]); string path = PathDefine::getTopicPartitionPath(_zkPath, topicName, pid); readZk(path, *pi); pi->set_id(pid); } return true; } bool AdminZkDataAccessor::getAllTopicNameOld(vector<string> &names) const { string path = PathDefine::getTopicDir(_zkPath); ZOO_ERRORS ec = _zkWrapper->getChild(path, names); if (ZOK != ec) { AUTIL_LOG(ERROR, "Failed to list dir %s.", path.c_str()); return false; } return true; } bool AdminZkDataAccessor::setDispatchedTask(const DispatchInfo &dispatchInfo) { return writeZk(PathDefine::getTaskInfoPath(_zkPath, dispatchInfo.rolename()), dispatchInfo); } bool AdminZkDataAccessor::getDispatchedTask(const string &roleName, DispatchInfo &dispatchInfo) { return readZk(PathDefine::getTaskInfoPath(_zkPath, roleName), dispatchInfo); } bool AdminZkDataAccessor::setLeaderInfo(const LeaderInfo &leaderInfo) { if (!writeZk(PathDefine::getLeaderInfoFile(_zkPath), leaderInfo)) { return false; } string jsonPath = PathDefine::getLeaderInfoJsonFile(_zkPath); if (!writeZk(jsonPath, leaderInfo, false, true)) { AUTIL_LOG(ERROR, "write leader json file Failed [%s].", jsonPath.c_str()); return false; } return true; } bool AdminZkDataAccessor::getLeaderInfo(LeaderInfo &leaderInfo) const { return readZk(PathDefine::getLeaderInfoFile(_zkPath), leaderInfo); } bool AdminZkDataAccessor::addLeaderInfoToHistory(const LeaderInfo &leaderInfo) { string path = PathDefine::getLeaderHistoryFile(_zkPath); LeaderInfoHistory leaderInfoHistory; LeaderInfo *newLeaderInfo = leaderInfoHistory.add_leaderinfos(); newLeaderInfo->CopyFrom(leaderInfo); bool exist = false; if (_zkWrapper->check(path, exist) && exist) { LeaderInfoHistory oldLeaderInfoHistory; readZk(path, oldLeaderInfoHistory); for (int i = 0; i < min(oldLeaderInfoHistory.leaderinfos().size(), LEADER_HISTORY_MAX_COUNT - 1); i++) { LeaderInfo *newLeaderInfo = leaderInfoHistory.add_leaderinfos(); newLeaderInfo->CopyFrom(oldLeaderInfoHistory.leaderinfos(i)); } } if (!writeZk(path, leaderInfoHistory)) { return false; } return true; } bool AdminZkDataAccessor::getHistoryLeaders(LeaderInfoHistory &historyLeaders) const { return readZk(PathDefine::getLeaderHistoryFile(_zkPath), historyLeaders); } bool AdminZkDataAccessor::readConfigVersion(std::string &versionStr) { string path = PathDefine::getConfigVersionFile(_zkPath); ZOO_ERRORS ec = _zkWrapper->getData(path, versionStr); if (ZOK != ec) { AUTIL_LOG(ERROR, "fail to get data %s", path.c_str()); return false; } return true; } bool AdminZkDataAccessor::writeConfigVersion(const std::string &versionStr) { string path = PathDefine::getConfigVersionFile(_zkPath); if (!writeFile(path, versionStr)) { AUTIL_LOG(ERROR, "fail to write data %s", path.c_str()); return false; } return true; } bool AdminZkDataAccessor::readLeaderAddress(const std::string &roleName, std::string &leaderAddress) { string address; string path = PathDefine::getLeaderFilePath(_zkPath, roleName); ZOO_ERRORS ec = _zkWrapper->getData(path, address); if (ZOK != ec) { AUTIL_LOG(ERROR, "read leader address for role [%s] failed, path [%s].", roleName.c_str(), path.c_str()); return false; } // support leader file have both rpc and http address const vector<string> &addrVec = StringUtil::split(address, "\n"); if (addrVec.empty()) { leaderAddress = address; } else { leaderAddress = addrVec[0]; } return true; } bool AdminZkDataAccessor::writeLeaderAddress(const string &roleName, const string &leaderAddress) { string path = PathDefine::getLeaderFilePath(_zkPath, roleName); bool isSuc = writeFile(path, leaderAddress); if (!isSuc) { AUTIL_LOG(ERROR, "write leader address for role [%s] failed, path [%s].", roleName.c_str(), path.c_str()); } return isSuc; } bool AdminZkDataAccessor::setBrokerVersionInfos(const RoleVersionInfos &roleVersionInfos) { if (_versionInfos == roleVersionInfos) { return true; } const string &path = PathDefine::getBrokerVersionsFile(_zkPath); bool isSuc = writeZk(path, roleVersionInfos); if (!isSuc) { AUTIL_LOG(ERROR, "write broker version failed, path [%s].", path.c_str()); } else { _versionInfos = roleVersionInfos; } return isSuc; } bool AdminZkDataAccessor::getBrokerVersionInfos(RoleVersionInfos &roleVersionInfos) { const string &path = PathDefine::getBrokerVersionsFile(_zkPath); bool isSuc = readZk(path, roleVersionInfos); if (!isSuc) { AUTIL_LOG(ERROR, "write broker version failed, path [%s].", path.c_str()); } return isSuc; } bool AdminZkDataAccessor::readTopicMetas(string &content) { const string &path = PathDefine::getTopicMetaFile(_zkPath); ZOO_ERRORS ec = _zkWrapper->getData(path, content); if (ZOK != ec) { AUTIL_LOG(ERROR, "fail to get data %s", path.c_str()); return false; } return true; } bool AdminZkDataAccessor::loadTopicSchemas() { bool exist = false; const string &path = PathDefine::getSchemaDir(_zkPath); if (!_zkWrapper->check(path, exist)) { AUTIL_LOG(ERROR, "fail to check schema dir[%s]", path.c_str()); return false; } if (!exist) { AUTIL_LOG(ERROR, "schema dir[%s] not exist", path.c_str()); return false; } vector<string> topics; ZOO_ERRORS ec = _zkWrapper->getChild(path, topics); if (ZOK != ec) { AUTIL_LOG(ERROR, "Failed to list dir %s.", path.c_str()); return false; } map<string, SchemaInfos> topicSchemas; for (size_t index = 0; index < topics.size(); index++) { SchemaInfos schemaInfos; const string &path = PathDefine::getTopicSchemaFile(_zkPath, topics[index]); if (!readZk(path, schemaInfos, true)) { AUTIL_LOG(ERROR, "fail to get topic schema[%s], continue", topics[index].c_str()); continue; } if (0 != schemaInfos.sinfos_size()) { topicSchemas.insert({topics[index], schemaInfos}); AUTIL_LOG(INFO, "load topic[%s] schema success", topics[index].c_str()); } } _topicSchemas = topicSchemas; return true; } bool AdminZkDataAccessor::getOldestSchemaVersion(const SchemaInfos &scmInfos, int32_t &version, int &pos) { uint32_t oldestTime = std::numeric_limits<uint32_t>::max(); for (int64_t index = 0; index < scmInfos.sinfos_size(); ++index) { if (oldestTime > scmInfos.sinfos(index).registertime()) { oldestTime = scmInfos.sinfos(index).registertime(); version = scmInfos.sinfos(index).version(); pos = index; } } return true; } bool AdminZkDataAccessor::writeTopicSchema(const std::string &topicName, const SchemaInfos &scmInfos) { const string &path = PathDefine::getTopicSchemaFile(_zkPath, topicName); bool isSuc = writeZk(path, scmInfos, true); if (!isSuc) { AUTIL_LOG(ERROR, "write topic[%s] schema failed, path[%s]", topicName.c_str(), path.c_str()); } return isSuc; } bool AdminZkDataAccessor::addTopicSchema(const string &topicName, int32_t version, const string &schema, int32_t &removedVersion, uint32_t maxAllowSchemaNum) { SchemaInfo si; si.set_version(version); si.set_registertime((uint32_t)TimeUtility::currentTimeInSeconds()); si.set_schema(schema); SchemaInfos scmInfos = _topicSchemas[topicName]; if ((int)maxAllowSchemaNum <= scmInfos.sinfos_size()) { int index = 0; if (getOldestSchemaVersion(scmInfos, removedVersion, index)) { AUTIL_LOG(INFO, "remove obsolete version[%d]", removedVersion); *scmInfos.mutable_sinfos(index) = si; } } else { *scmInfos.add_sinfos() = si; } if (!writeTopicSchema(topicName, scmInfos)) { AUTIL_LOG(ERROR, "write topic schema to zk failed."); return false; } _topicSchemas[topicName] = scmInfos; AUTIL_LOG(INFO, "add topic[%s] schema[%s] version[%d]", topicName.c_str(), schema.c_str(), version); return true; } bool AdminZkDataAccessor::getSchema(const string &topicName, int32_t version, SchemaInfo &schemaInfo) { const auto iter = _topicSchemas.find(topicName); if (_topicSchemas.end() == iter) { AUTIL_LOG(WARN, "topic[%s] schema not found", topicName.c_str()); return false; } bool retValue = false; const SchemaInfos &infos = iter->second; if (0 == version) { // need latest uint32_t latestTime = 0; for (int64_t index = 0; index < infos.sinfos_size(); ++index) { if (latestTime < infos.sinfos(index).registertime()) { latestTime = infos.sinfos(index).registertime(); schemaInfo = infos.sinfos(index); retValue = true; } } } else { for (int64_t index = 0; index < infos.sinfos_size(); ++index) { if (version == infos.sinfos(index).version()) { schemaInfo = infos.sinfos(index); retValue = true; } } } return retValue; } bool AdminZkDataAccessor::removeTopicSchema(const string &topicName) { const auto iter = _topicSchemas.find(topicName); if (_topicSchemas.end() == iter) { return true; } const string &path = PathDefine::getTopicSchemaFile(_zkPath, topicName); if (remove(path)) { _topicSchemas.erase(iter); AUTIL_LOG(INFO, "remove topic[%s] schema success", topicName.c_str()); } return true; } bool AdminZkDataAccessor::writeTopicRWTime(const TopicRWInfos &rwInfos) { const string &rwFile = PathDefine::getTopicRWFile(_zkPath); return writeZk(rwFile, rwInfos, true); } bool AdminZkDataAccessor::loadTopicRWTime(TopicRWInfos &rwInfos) { const string &path = PathDefine::getTopicRWFile(_zkPath); return readZk(path, rwInfos, true); } bool AdminZkDataAccessor::writeDeletedNoUseTopics(const TopicMetas &metas, uint32_t maxAllowNum) { const string &noUseTopicDir = PathDefine::getNoUseTopicDir(_zkPath); vector<string> topicFiles; ZOO_ERRORS ec = _zkWrapper->getChild(noUseTopicDir, topicFiles); if (ZOK != ec) { AUTIL_LOG(ERROR, "Failed to list dir %s.", noUseTopicDir.c_str()); return false; } if (topicFiles.size() >= maxAllowNum) { size_t deleteCnt = topicFiles.size() - maxAllowNum + 1; std::sort(topicFiles.begin(), topicFiles.end()); //增序 for (size_t i = 0; i < deleteCnt; ++i) { const string &oldestFile = fslib::fs::FileSystem::joinFilePath(noUseTopicDir, topicFiles[i]); if (_zkWrapper->deleteNode(oldestFile)) { AUTIL_LOG(INFO, "delete oldest NoUseTopic file[%s] success", oldestFile.c_str()); } else { AUTIL_LOG(ERROR, "delete oldest NoUseTopic file[%s] fail", oldestFile.c_str()); } } } const string &newTopicFile = fslib::fs::FileSystem::joinFilePath(noUseTopicDir, StringUtil::toString(TimeUtility::currentTimeInSeconds())); AUTIL_LOG(INFO, "write deleted NoUseTopic file[%s]", newTopicFile.c_str()); return writeZk(newTopicFile, metas, true); } bool AdminZkDataAccessor::loadLastDeletedNoUseTopics(TopicMetas &metas) { string lastFileName; if (!getLastDeletedNoUseTopicFileName(lastFileName)) { return false; } if (lastFileName.empty()) { return true; } return loadDeletedNoUseTopics(lastFileName, metas); } bool AdminZkDataAccessor::getLastDeletedNoUseTopicFileName(string &fileName) { vector<string> topicFiles; if (!loadDeletedNoUseTopicFiles(topicFiles)) { return false; } if (0 == topicFiles.size()) { AUTIL_LOG(INFO, "deleted NoUseTopic dir empty"); return true; } std::sort(topicFiles.begin(), topicFiles.end()); //增序 fileName = topicFiles[topicFiles.size() - 1]; return true; } bool AdminZkDataAccessor::loadDeletedNoUseTopics(const string &fileName, TopicMetas &metas) { const string &noUseTopicDir = PathDefine::getNoUseTopicDir(_zkPath); const string &noUseFile = fslib::fs::FileSystem::joinFilePath(noUseTopicDir, fileName); return readZk(noUseFile, metas, true); } bool AdminZkDataAccessor::loadDeletedNoUseTopicFiles(vector<string> &topicFiles) { const string &noUseTopicDir = PathDefine::getNoUseTopicDir(_zkPath); ZOO_ERRORS ec = _zkWrapper->getChild(noUseTopicDir, topicFiles); if (ZOK != ec) { AUTIL_LOG(ERROR, "Failed to list dir[%s]", noUseTopicDir.c_str()); return false; } return true; } bool AdminZkDataAccessor::loadChangePartCntTask() { const string &path = PathDefine::getChangePartCntTaskFile(_zkPath); bool exist = false; if (!_zkWrapper->check(path, exist)) { AUTIL_LOG(WARN, "failed to check file[%s] exists", path.c_str()); return false; } if (!exist) { AUTIL_LOG(INFO, "change part count task[%s] not exist", path.c_str()); return true; } protocol::ChangeTopicPartCntTasks changePartCntTasks; bool retVal = readZk(path, changePartCntTasks, true); if (retVal) { AUTIL_LOG( INFO, "load change partcnt task[%s] success, task size[%d]", path.c_str(), changePartCntTasks.tasks_size()); } else { AUTIL_LOG(ERROR, "load change partcnt task[%s] fail", path.c_str()); return false; } ScopedWriteLock lock(_changePartCntTasksLock); _changePartCntTasks = changePartCntTasks; return retVal; } ErrorCode AdminZkDataAccessor::sendChangePartCntTask(const TopicCreationRequest &request) { ScopedWriteLock lock(_changePartCntTasksLock); ChangeTopicPartCntTasks tasks = _changePartCntTasks; for (int idx = tasks.tasks_size() - 1; idx >= 0; --idx) { const auto &meta = tasks.tasks(idx).meta(); if (request.topicname() == meta.topicname()) { AUTIL_LOG( ERROR, "topic[%s] has task[%s] doing..", request.topicname().c_str(), meta.ShortDebugString().c_str()); return ERROR_ADMIN_CHG_PART_TASK_NOT_FINISH; } } ChangeTopicPartCntTask *taskPtr = tasks.add_tasks(); taskPtr->set_taskid(request.modifytime()); *taskPtr->mutable_meta() = request; const string &tasksFilePath = PathDefine::getChangePartCntTaskFile(_zkPath); if (!writeZk(tasksFilePath, tasks, true)) { AUTIL_LOG(ERROR, "write change partcnt task[%s] to zk failed", request.ShortDebugString().c_str()); return ERROR_ADMIN_OPERATION_FAILED; } _changePartCntTasks = tasks; AUTIL_LOG(INFO, "write change partcnt task[%s] to zk success, task size[%d]", request.ShortDebugString().c_str(), tasks.tasks_size()); return ERROR_NONE; } void AdminZkDataAccessor::updateChangePartCntTasks(const set<int64_t> &finishedIds) { if (0 == finishedIds.size()) { return; } ScopedWriteLock lock(_changePartCntTasksLock); ChangeTopicPartCntTasks newTasks; for (int idx = _changePartCntTasks.tasks_size() - 1; idx >= 0; --idx) { const auto &task = _changePartCntTasks.tasks(idx); if (finishedIds.find(task.meta().modifytime()) == finishedIds.end()) { *newTasks.add_tasks() = task; } } const string &tasksFilePath = PathDefine::getChangePartCntTaskFile(_zkPath); if (!writeZk(tasksFilePath, newTasks, true)) { AUTIL_LOG(ERROR, "write change partcnt task[%s] to zk failed", newTasks.ShortDebugString().c_str()); } else { _changePartCntTasks = newTasks; AUTIL_LOG(INFO, "update change partcnt tasks success, task size[%d]", newTasks.tasks_size()); } } const ChangeTopicPartCntTasks AdminZkDataAccessor::getChangePartCntTasks() const { ScopedReadLock lock(_changePartCntTasksLock); return _changePartCntTasks; } bool AdminZkDataAccessor::writeMasterVersion(uint64_t targetVersion) { const string &path = PathDefine::getSelfMasterVersionFile(_zkPath); if (!writeFile(path, StringUtil::toString(targetVersion))) { AUTIL_LOG(ERROR, "fail to write data %s", path.c_str()); return false; } return true; } bool AdminZkDataAccessor::readMasterVersion(uint64_t &masterVersion) { const string &path = PathDefine::getSelfMasterVersionFile(_zkPath); bool exist = false; if (!_zkWrapper->check(path, exist)) { AUTIL_LOG(ERROR, "fail to check master version file[%s]", path.c_str()); return false; } if (!exist) { AUTIL_LOG(INFO, "master version file[%s] not exist", path.c_str()); return true; } std::string content; auto ec = _zkWrapper->getData(path, content); if (ZOK != ec) { AUTIL_LOG(ERROR, "fail to get data %s", path.c_str()); return false; } if (!StringUtil::strToUInt64(content.c_str(), masterVersion)) { AUTIL_LOG(ERROR, "fail to convert version[%s] to uint64", content.c_str()); return false; } return true; } bool AdminZkDataAccessor::serializeCleanAtDeleteTasks(const protocol::CleanAtDeleteTopicTasks &tasks) { string toDelTopicsPath = PathDefine::getCleanAtDeleteTaskFile(_zkPath); if (!writeZk(toDelTopicsPath, tasks, true, false)) { AUTIL_LOG(WARN, "write clean at delete topic plan fail"); return false; } return true; } bool AdminZkDataAccessor::loadCleanAtDeleteTopicTasks(CleanAtDeleteTopicTasks &tasks) { const string &path = PathDefine::getCleanAtDeleteTaskFile(_zkPath); bool exist = false; if (!_zkWrapper->check(path, exist)) { AUTIL_LOG(WARN, "failed to check file[%s] exists", path.c_str()); return false; } if (!exist) { AUTIL_LOG(INFO, "clean at delete task[%s] not exist", path.c_str()); return true; } bool retVal = readZk(path, tasks, true, false); if (retVal) { AUTIL_LOG(INFO, "load clean at delete task[%s] success, task size[%d]", path.c_str(), tasks.tasks_size()); } else { AUTIL_LOG(ERROR, "load clean at delete task[%s] fail", path.c_str()); return false; } return retVal; } void AdminZkDataAccessor::removeOldWriterVersionData(const std::set<std::string> &topicNames) { if (topicNames.empty()) { return; } const std::string &zkPath = PathDefine::getWriteVersionControlPath(_zkPath); std::vector<std::string> topics; ZOO_ERRORS ec = _zkWrapper->getChild(zkPath, topics); if (ZOK != ec) { AUTIL_LOG(ERROR, "Failed to list dir[%s]", zkPath.c_str()); return; } for (const auto &topic : topics) { if (topicNames.end() == topicNames.find(topic)) { const std::string &path = fslib::fs::FileSystem::joinFilePath(zkPath, topic); if (_zkWrapper->remove(path)) { AUTIL_LOG(INFO, "remove topic[%s] writer version success", topic.c_str()); } else { AUTIL_LOG(WARN, "remove topic[%s] writer version fail", topic.c_str()); } } } } } // namespace admin } // namespace swift