aios/apps/facility/swift/network/SwiftAdminAdapter.cpp (942 lines of code) (raw):

/* * Copyright 2014-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "swift/network/SwiftAdminAdapter.h" #include <algorithm> #include <assert.h> #include <google/protobuf/io/coded_stream.h> #include <google/protobuf/stubs/port.h> #include <iosfwd> #include <stdlib.h> #include <time.h> #include <unistd.h> #include <utility> #include <vector> #include "autil/TimeUtility.h" #include "google/protobuf/io/zero_copy_stream_impl_lite.h" #include "swift/common/PathDefine.h" #include "swift/network/ClientFileUtil.h" #include "swift/network/SwiftAdminClient.h" #include "swift/network/SwiftRpcChannelManager.h" #include "swift/protocol/AdminRequestResponse.pb.h" #include "swift/protocol/Common.pb.h" #include "swift/protocol/ErrCode.pb.h" #include "swift/protocol/Heartbeat.pb.h" using namespace std; using namespace autil; using namespace google::protobuf; using namespace google::protobuf::io; using namespace swift::protocol; using namespace swift::common; namespace swift { namespace network { AUTIL_LOG_SETUP(swift, SwiftAdminAdapter); static const int64_t LATELY_ERROR_TIME_MIN_INTERVAL = 300 * 1000; // 0.3 s static const int64_t LATELY_ERROR_TIME_MAX_INTERVAL = 10000 * 1000; // 10 s SwiftAdminAdapter::SwiftAdminAdapter(const string &zkRootPath, const SwiftRpcChannelManagerPtr &channelManager, bool useFollowerAdmin, int64_t timeout, int64_t refreshTime) : _zkRootPath(zkRootPath) , _useFollowerAdmin(useFollowerAdmin) , _channelManager(channelManager) , _lastGeneralErrorCode(ERROR_NONE) , _lastGeneralErrorTime(-1) , _timeout(timeout) , _refreshTime(refreshTime) { _latelyErrorTimeMaxInterval = LATELY_ERROR_TIME_MAX_INTERVAL; _latelyErrorTimeMinInterval = LATELY_ERROR_TIME_MIN_INTERVAL; } SwiftAdminAdapter::~SwiftAdminAdapter() { resetAdminClient(); } #define CHECK_LATELY_ERROR \ int64_t now = TimeUtility::currentTime(); \ ErrorCode ec = ERROR_NONE; \ ec = getLatelyError(now); \ if (ERROR_NONE != ec) { \ AUTIL_LOG(WARN, "has laterly error [%s]!", ErrorCode_Name(ec).c_str()); \ return ec; \ } #define DO_ADMIN_COMMON_RPC(adminClient, METHOD, response) \ { \ if (!adminClient->METHOD(&request, &response)) { \ AUTIL_LOG(WARN, "swift admin arpc failed."); \ ec = ERROR_CLIENT_ARPC_ERROR; \ } \ \ if (response.errorinfo().errcode() != ERROR_NONE) { \ AUTIL_LOG(WARN, \ "swift admin error, errorcode:[%d], errormsg[%s].", \ response.errorinfo().errcode(), \ response.errorinfo().errmsg().c_str()); \ ec = response.errorinfo().errcode(); \ } \ } ErrorCode SwiftAdminAdapter::getBrokerAddress(const string &topicName, uint32_t partitionId, string &brokerAddress, protocol::AuthenticationInfo authInfo) { protocol::BrokerVersionInfo versionInfo; return getBrokerInfo(topicName, partitionId, brokerAddress, versionInfo, authInfo); } ErrorCode SwiftAdminAdapter::getBrokerInfo(const std::string &topicName, uint32_t partitionId, std::string &brokerAddress, BrokerVersionInfo &versionInfo, protocol::AuthenticationInfo authInfo) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_readAdminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } ec = doGetBrokerInfo(topicName, partitionId, brokerAddress, versionInfo, authInfo); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); if (ec != ERROR_NONE) { addErrorInfoToCacheTopic(topicName, partitionId, "unkonwn"); } return ec; } ErrorCode SwiftAdminAdapter::getLatelyError(int64_t now) { if ((ERROR_NONE == _lastGeneralErrorCode) || (_lastGeneralErrorTime + getLatelyErrorInterval() < now)) { return ERROR_NONE; } return _lastGeneralErrorCode; } int64_t SwiftAdminAdapter::getLatelyErrorInterval() { srand((uint64_t)time(NULL)); int64_t interval = _latelyErrorTimeMinInterval + (rand() % (_latelyErrorTimeMaxInterval - _latelyErrorTimeMinInterval + 1)); AUTIL_LOG(WARN, "lately error interval is [%ld]", interval); return interval; } void SwiftAdminAdapter::checkError(ErrorCode ec, int64_t now) { if (isGeneralError(ec)) { _lastGeneralErrorCode = ec; _lastGeneralErrorTime = now; } else { _lastGeneralErrorCode = ERROR_NONE; _lastGeneralErrorTime = -1; } } bool SwiftAdminAdapter::isGeneralError(ErrorCode ec) { return (ERROR_CLIENT_RPC_CONNECT == ec || ERROR_CLIENT_GET_ADMIN_INFO_FAILED == ec || ERROR_CLIENT_SYS_STOPPED == ec || ERROR_CLIENT_ARPC_ERROR == ec); } ErrorCode SwiftAdminAdapter::getAllTopicInfo(AllTopicInfoResponse &response) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_readAdminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } ec = doGetAllTopicInfo(response); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } void SwiftAdminAdapter::resetAdminClient() { _adminClient.reset(); _readAdminClient.reset(); } ErrorCode SwiftAdminAdapter::getPartitionInfo(const string &topicName, uint32_t partitionId, PartitionInfoResponse &response) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_readAdminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } ec = doGetPartitionInfo(topicName, partitionId, response); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } ErrorCode SwiftAdminAdapter::getTopicInfo(const string &topicName, TopicInfoResponse &response, int64_t topicVersion, protocol::AuthenticationInfo authInfo) { return getTopicInfoOpt(topicName, response, topicVersion, authInfo); } ErrorCode SwiftAdminAdapter::getTopicInfoWithLeader(const string &topicName, TopicInfoResponse &response, bool fromLeader, protocol::AuthenticationInfo authInfo) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_readAdminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } ec = doGetTopicInfo(topicName, response, fromLeader, authInfo); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } ErrorCode SwiftAdminAdapter::getPartitionCount(const string &topicName, uint32_t &partitionCount) { uint32_t rangeCountInPartition = 0; return getPartitionCount(topicName, 0, partitionCount, rangeCountInPartition); } ErrorCode SwiftAdminAdapter::getPartitionCount(const string &topicName, int64_t topicVersion, uint32_t &partitionCount, uint32_t &rangeCountInPartition) { TopicInfoResponse response; ErrorCode ec = getTopicInfoOpt(topicName, response, topicVersion, {}); if (ERROR_NONE == ec) { partitionCount = (uint32_t)response.topicinfo().partitioncount(); rangeCountInPartition = (uint32_t)response.topicinfo().rangecountinpartition(); } return ec; } ErrorCode SwiftAdminAdapter::getTopicInfoOpt(const string &topicName, TopicInfoResponse &response, int64_t topicVersion, protocol::AuthenticationInfo authInfo) { TopicInfo cacheTopicInfo; bool fromLeader = false; bool cacheHit = getTopicInfoFromCache(topicName, cacheTopicInfo, fromLeader); if (cacheHit) { if (!cacheTopicInfo.has_modifytime() || cacheTopicInfo.modifytime() < topicVersion) // version is modifytime { AUTIL_LOG(INFO, "hit cache topic version[%ld] less than client " "version[%ld], now getTopicInfo from admin leader", cacheTopicInfo.modifytime(), topicVersion); TopicInfoResponse tmpResponse; ErrorCode ec = getTopicInfoWithLeader(topicName, tmpResponse, true, authInfo); if (ec != ERROR_NONE) { return ec; } if (!tmpResponse.has_topicinfo()) { AUTIL_LOG(WARN, "get topic[%s] info from leader failed, topic info empty", topicName.c_str()); return ERROR_CLIENT_INVALID_RESPONSE; } response = tmpResponse; } else { AUTIL_LOG( INFO, "hit cache topic version[%ld], client version[%ld]", cacheTopicInfo.modifytime(), topicVersion); TopicInfo *ti = response.mutable_topicinfo(); *ti = cacheTopicInfo; ErrorInfo *ei = response.mutable_errorinfo(); ei->set_errcode(ERROR_NONE); ei->set_errmsg(ErrorCode_Name(ei->errcode())); } } else { TopicInfoResponse tmpResponse; ErrorCode ec = getTopicInfoWithLeader(topicName, tmpResponse, false, authInfo); if (ec != ERROR_NONE) { return ec; } if (!tmpResponse.has_topicinfo()) { AUTIL_LOG(WARN, "get topic[%s] info from follow failed, topic info empty", topicName.c_str()); return ERROR_CLIENT_INVALID_RESPONSE; } if (tmpResponse.topicinfo().has_modifytime() && tmpResponse.topicinfo().modifytime() >= topicVersion && topicVersion > 0) { AUTIL_LOG(INFO, "follow admin topic version[%ld], client version[%ld]", tmpResponse.topicinfo().modifytime(), topicVersion); response = tmpResponse; } else { TopicInfoResponse tmpResponse; ErrorCode ec = getTopicInfoWithLeader(topicName, tmpResponse, true, authInfo); if (ec != ERROR_NONE) { return ec; } if (!tmpResponse.has_topicinfo()) { AUTIL_LOG(WARN, "get topic[%s] info from leader failed, topic info empty", topicName.c_str()); return ERROR_CLIENT_INVALID_RESPONSE; } AUTIL_LOG(INFO, "leader admin topic version[%ld], client version[%ld]", tmpResponse.topicinfo().modifytime(), topicVersion); response = tmpResponse; } } return ERROR_NONE; } ErrorCode SwiftAdminAdapter::createTopic(protocol::TopicCreationRequest &request) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_adminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } ec = doCreateTopic(request); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } ErrorCode SwiftAdminAdapter::createTopicBatch(TopicBatchCreationRequest &request, TopicBatchCreationResponse &response) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_adminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } addRequestExtendInfo(request); DO_ADMIN_COMMON_RPC(_adminClient, createTopicBatch, response); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } bool SwiftAdminAdapter::waitTopicReady(const string &topicName, protocol::AuthenticationInfo authInfo, int64_t timeout) { int64_t endTime = TimeUtility::currentTime() + timeout; while (TimeUtility::currentTime() < endTime) { TopicInfoResponse response; ErrorCode ec = getTopicInfo(topicName, response, 0, authInfo); if (ec == ERROR_NONE && response.topicinfo().status() == TOPIC_STATUS_RUNNING) { return true; } else { usleep(500 * 1000); } } return false; } ErrorCode SwiftAdminAdapter::deleteTopic(const std::string &topicName) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_adminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } ec = doDeleteTopic(topicName); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } ErrorCode SwiftAdminAdapter::deleteTopicBatch(TopicBatchDeletionRequest &request, TopicBatchDeletionResponse &response) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_adminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } addRequestExtendInfo(request); DO_ADMIN_COMMON_RPC(_adminClient, deleteTopicBatch, response); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } ErrorCode SwiftAdminAdapter::doCreateTopic(protocol::TopicCreationRequest &request) { TopicCreationResponse response; ErrorCode ec = ERROR_NONE; addRequestExtendInfo(request); DO_ADMIN_COMMON_RPC(_adminClient, createTopic, response); return ec; } ErrorCode SwiftAdminAdapter::doDeleteTopic(const std::string &topicName) { TopicDeletionRequest request; request.set_topicname(topicName); addRequestExtendInfo(request); TopicDeletionResponse response; ErrorCode ec = ERROR_NONE; DO_ADMIN_COMMON_RPC(_adminClient, deleteTopic, response); return ec; } ErrorCode SwiftAdminAdapter::createAdminClient() { string leaderInfoFile = common::PathDefine::getLeaderInfoFile(_zkRootPath); string leaderInfoContent; ClientFileUtil fileUtil; if (!fileUtil.readFile(leaderInfoFile, leaderInfoContent) || leaderInfoContent.empty()) { AUTIL_LOG(WARN, "get leader info content failed. zookeeper path[%s]", leaderInfoFile.c_str()); return ERROR_CLIENT_GET_ADMIN_INFO_FAILED; } LeaderInfo leaderInfo; ArrayInputStream stream((void *)leaderInfoContent.data(), leaderInfoContent.size()); if (!leaderInfo.ParseFromZeroCopyStream(&stream)) { AUTIL_LOG(WARN, "parse leader info failed!"); return ERROR_CLIENT_GET_ADMIN_INFO_FAILED; } if (leaderInfo.address().empty()) { AUTIL_LOG(WARN, "admin leader address is empty!"); return ERROR_CLIENT_GET_ADMIN_INFO_FAILED; } AUTIL_LOG(INFO, "leader info [%s]", leaderInfo.ShortDebugString().c_str()); if (leaderInfo.sysstop()) { AUTIL_LOG(WARN, "swift has stopped!"); return ERROR_CLIENT_SYS_STOPPED; } _adminClient.reset(new SwiftAdminClient(leaderInfo.address(), _channelManager, _timeout)); _readAdminClient = _adminClient; AUTIL_LOG(INFO, "create admin client to [%s] success!", leaderInfo.address().c_str()); if (_useFollowerAdmin) { vector<string> adminAddrVec; for (int i = 0; i < leaderInfo.admininfos_size(); i++) { AdminInfo *adminInfo = leaderInfo.mutable_admininfos(i); if (adminInfo->isalive() && !adminInfo->isprimary()) { adminAddrVec.push_back(adminInfo->address()); } } if (adminAddrVec.size() != 0) { int64_t curTime = TimeUtility::currentTime(); srand(curTime); size_t i = rand() % adminAddrVec.size(); AUTIL_LOG(INFO, "create admin client to [%s] success!", adminAddrVec[i].c_str()); _readAdminClient.reset(new SwiftAdminClient(adminAddrVec[i], _channelManager, _timeout)); } } return ERROR_NONE; } bool SwiftAdminAdapter::needResetAdminClient(ErrorCode ec) { return ec == ERROR_CLIENT_ARPC_ERROR || ec == ERROR_ADMIN_NOT_LEADER || ec == ERROR_ADMIN_IP_SET_INVALID; } ErrorCode SwiftAdminAdapter::doGetBrokerInfo(const string &topicName, uint32_t partitionId, string &brokerAddress, BrokerVersionInfo &versionInfo, protocol::AuthenticationInfo authInfo) { TopicInfo cacheTopicInfo; bool fromLeader; bool cacheHit = getTopicInfoFromCache(topicName, cacheTopicInfo, fromLeader); if (cacheHit) { AUTIL_LOG(INFO, "[%s %d] hit topic info cache, fromLeader[%d]", topicName.c_str(), partitionId, fromLeader); } else { AUTIL_LOG(INFO, "[%s %d] not hit topic info cache, now rpc follow admin", topicName.c_str(), partitionId); TopicInfoResponse response; ErrorCode ec = doGetTopicInfo(topicName, response, false, authInfo); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "get topic info[%s] failed, error[%s]", topicName.c_str(), ErrorCode_Name(ec).c_str()); return ec; } cacheTopicInfo = response.topicinfo(); } if (TOPIC_TYPE_LOGIC == cacheTopicInfo.topictype()) { AUTIL_LOG(WARN, "[%s %d] is logic topic, " "return ERROR_CLIENT_LOGIC_TOPIC_NOT_IN_BROKER", topicName.c_str(), partitionId); return ERROR_CLIENT_LOGIC_TOPIC_NOT_IN_BROKER; } if (cacheTopicInfo.partitioninfos_size() <= (int32_t)partitionId) { AUTIL_LOG(WARN, "cacheTopicInfo partition size[%d] < partitionId[%d], error[" "ERROR_CLIENT_GET_BROKER_ADDRESS_FAILED]", cacheTopicInfo.partitioninfos_size(), partitionId); return ERROR_CLIENT_GET_BROKER_ADDRESS_FAILED; } const PartitionInfo &partInfo = cacheTopicInfo.partitioninfos(partitionId); if (!partInfo.has_brokeraddress() || partInfo.brokeraddress().empty()) { AUTIL_LOG(WARN, "[%s %d] partition not be scheduled, error[" "ERROR_CLIENT_GET_BROKER_ADDRESS_FAILED]", topicName.c_str(), partitionId); return ERROR_CLIENT_GET_BROKER_ADDRESS_FAILED; } brokerAddress = partInfo.brokeraddress(); versionInfo = partInfo.versioninfo(); return ERROR_NONE; } ErrorCode SwiftAdminAdapter::doGetPartitionInfo(const string &topicName, uint32_t partitionId, PartitionInfoResponse &response) { assert(_readAdminClient); PartitionInfoRequest request; request.set_topicname(topicName); request.add_partitionids(partitionId); addRequestExtendInfo(request); int64_t begTime = TimeUtility::currentTime(); if (!_readAdminClient->getPartitionInfo(&request, &response)) { AUTIL_LOG(WARN, "swift admin arpc failed."); return ERROR_CLIENT_ARPC_ERROR; } int64_t endTime = TimeUtility::currentTime(); AUTIL_LOG(INFO, "real get partition [%s %d] info used [%ldus], content: [%s]", topicName.c_str(), (int)partitionId, endTime - begTime, response.ShortDebugString().c_str()); if (response.errorinfo().errcode() != ERROR_NONE) { AUTIL_LOG(WARN, "swift admin error, errorcode:[%d], errormsg[%s].", response.errorinfo().errcode(), response.errorinfo().errmsg().c_str()); return response.errorinfo().errcode(); } return ERROR_NONE; } ErrorCode SwiftAdminAdapter::doGetTopicInfo(const string &topicName, TopicInfoResponse &response, bool fromLeader, protocol::AuthenticationInfo authInfo) { assert(_adminClient); assert(_readAdminClient); TopicInfoRequest request; request.set_topicname(topicName); addRequestExtendInfo(request, authInfo); int64_t begTime = TimeUtility::currentTime(); if (fromLeader) { if (!_adminClient->getTopicInfo(&request, &response)) { AUTIL_LOG(WARN, "swift admin arpc failed"); return ERROR_CLIENT_ARPC_ERROR; } } else { if (!_readAdminClient->getTopicInfo(&request, &response)) { AUTIL_LOG(WARN, "swift admin arpc failed"); return ERROR_CLIENT_ARPC_ERROR; } } int64_t endTime = TimeUtility::currentTime(); AUTIL_LOG(INFO, "real get topic[%s] info used [%ld us], from [%d]", topicName.c_str(), endTime - begTime, int(fromLeader)); if (response.errorinfo().errcode() != ERROR_NONE) { AUTIL_LOG(WARN, "swift admin error, errorcode:[%s], errormsg[%s].", ErrorCode_Name(response.errorinfo().errcode()).c_str(), response.errorinfo().errmsg().c_str()); return response.errorinfo().errcode(); } if (response.has_topicinfo()) { putTopicInfoIntoCache(topicName, response.topicinfo(), fromLeader); } return ERROR_NONE; } ErrorCode SwiftAdminAdapter::doGetAllTopicInfo(AllTopicInfoResponse &response) { assert(_readAdminClient); EmptyRequest emptyRequest; addRequestExtendInfo(emptyRequest); int64_t begTime = TimeUtility::currentTime(); if (!_readAdminClient->getAllTopicInfo(&emptyRequest, &response)) { AUTIL_LOG(WARN, "swift admin arpc failed."); return ERROR_CLIENT_ARPC_ERROR; } int64_t endTime = TimeUtility::currentTime(); AUTIL_LOG(INFO, "real get all topic info used [%ld]", endTime - begTime); if (response.errorinfo().errcode() != ERROR_NONE) { AUTIL_LOG(WARN, "swift admin error, errorcode:[%d], errormsg[%s].", response.errorinfo().errcode(), response.errorinfo().errmsg().c_str()); return response.errorinfo().errcode(); } return ERROR_NONE; } bool SwiftAdminAdapter::getTopicInfoFromCache(const string &topicName, protocol::TopicInfo &topicInfo, bool &fromLeader) { ScopedLock lock(_cacheMutex); TopicInfoCache::iterator iter = _topicInfoCache.find(topicName); if (iter == _topicInfoCache.end()) { return false; } int64_t cacheTime = TimeUtility::currentTime() - iter->second.putTime; // need cache topic info even has error. if (cacheTime >= _refreshTime || (iter->second.hasError && cacheTime > min(_refreshTime, _latelyErrorTimeMinInterval))) { AUTIL_LOG(INFO, "need refresh [%s] clear topic info from cache , cacheTime [%ld], put time [%ld], error [%d], " "refresh time [%ld] ", topicName.c_str(), cacheTime, iter->second.putTime, iter->second.hasError, _refreshTime); _topicInfoCache.erase(iter); return false; } else { topicInfo = iter->second.topicInfo; fromLeader = iter->second.fromLeader; return true; } } void SwiftAdminAdapter::putTopicInfoIntoCache(const string &topicName, const protocol::TopicInfo &topicInfo, bool fromLeader) { ScopedLock lock(_cacheMutex); TopicInfoCacheItem cacheItem; cacheItem.putTime = TimeUtility::currentTime(); cacheItem.topicInfo = topicInfo; cacheItem.fromLeader = fromLeader; _topicInfoCache[topicName] = cacheItem; AUTIL_LOG(INFO, "[%s] cache topic info, put time [%ld] ", topicName.c_str(), cacheItem.putTime); } void SwiftAdminAdapter::addErrorInfoToCacheTopic(const std::string &topicName, uint32_t partitionId, const std::string &address) { ScopedLock lock(_cacheMutex); auto iter = _topicInfoCache.find(topicName); if (iter != _topicInfoCache.end() && !iter->second.hasError) { if (partitionId >= iter->second.topicInfo.partitioninfos_size()) { iter->second.hasError = true; AUTIL_LOG(INFO, "[%s %u] add error into topic info, put time [%ld], cur topic part count [%d]", topicName.c_str(), partitionId, iter->second.putTime, iter->second.topicInfo.partitioninfos_size()); return; } const PartitionInfo &partInfo = iter->second.topicInfo.partitioninfos(partitionId); size_t pos = address.find(":"); string rawAddress = pos != string::npos ? address.substr(pos + 1) : address; if (rawAddress == partInfo.brokeraddress() || partInfo.brokeraddress().empty() || rawAddress == "unkonwn") { iter->second.hasError = true; AUTIL_LOG(INFO, "[%s %u] add error into topic info, put time [%ld]", topicName.c_str(), partitionId, iter->second.putTime); } else { AUTIL_LOG(INFO, "[%s %u]add error, but try reuse for broker address has change from [%s] to [%s]", topicName.c_str(), partitionId, rawAddress.c_str(), partInfo.brokeraddress().c_str()); } } } ErrorCode SwiftAdminAdapter::registerSchema(RegisterSchemaRequest &request, protocol::RegisterSchemaResponse &response) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_adminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } addRequestExtendInfo(request); DO_ADMIN_COMMON_RPC(_adminClient, registerSchema, response); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } ErrorCode SwiftAdminAdapter::getSchema(GetSchemaRequest &request, GetSchemaResponse &response, protocol::AuthenticationInfo authInfo) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_readAdminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } addRequestExtendInfo(request, authInfo); DO_ADMIN_COMMON_RPC(_readAdminClient, getSchema, response); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } ErrorCode SwiftAdminAdapter::reportBrokerStatus(BrokerStatusRequest &request, BrokerStatusResponse &response) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_adminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } addRequestExtendInfo(request); DO_ADMIN_COMMON_RPC(_adminClient, reportBrokerStatus, response); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } ErrorCode SwiftAdminAdapter::getBrokerStatus(GetBrokerStatusRequest &request, GetBrokerStatusResponse &response) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_adminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } addRequestExtendInfo(request); DO_ADMIN_COMMON_RPC(_adminClient, getBrokerStatus, response); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } ErrorCode SwiftAdminAdapter::getTopicRWTime(GetTopicRWTimeRequest &request, GetTopicRWTimeResponse &response) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_readAdminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } addRequestExtendInfo(request); DO_ADMIN_COMMON_RPC(_readAdminClient, getTopicRWTime, response); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } ErrorCode SwiftAdminAdapter::getAllTopicName(TopicNameResponse &response) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_readAdminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } EmptyRequest request; addRequestExtendInfo(request); DO_ADMIN_COMMON_RPC(_readAdminClient, getAllTopicName, response); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } // for follow admin ErrorCode SwiftAdminAdapter::reportMissTopic(MissTopicRequest &request, MissTopicResponse &response) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_adminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } addRequestExtendInfo(request); DO_ADMIN_COMMON_RPC(_adminClient, reportMissTopic, response); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } ErrorCode SwiftAdminAdapter::sealTopic(const string &topicName) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_adminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } TopicCreationRequest request; TopicCreationResponse response; request.set_topicname(topicName); request.set_sealed(true); addRequestExtendInfo(request); DO_ADMIN_COMMON_RPC(_adminClient, modifyTopic, response); if (ERROR_NONE != ec) { AUTIL_LOG( ERROR, "seal topic[%s] fail, ec[%d], ret[%s]", topicName.c_str(), ec, response.ShortDebugString().c_str()); } else { AUTIL_LOG(INFO, "seal topic[%s] success", topicName.c_str()); } if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } ErrorCode SwiftAdminAdapter::changePartCount(const string &topicName, uint32_t newPartCount) { if (0 == newPartCount) { AUTIL_LOG(ERROR, "newPartCount:0 is invalid"); return ERROR_CLIENT_INVALID_PARAMETERS; } ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_adminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } TopicCreationRequest request; TopicCreationResponse response; request.set_topicname(topicName); request.set_partitioncount(newPartCount); addRequestExtendInfo(request); DO_ADMIN_COMMON_RPC(_adminClient, modifyTopic, response); if (ERROR_NONE != ec) { AUTIL_LOG(ERROR, "change topic[%s] part count fail, ec[%d], ret[%s]", topicName.c_str(), ec, response.ShortDebugString().c_str()); } else { AUTIL_LOG(INFO, "change topic[%s] part count to[%d] success", topicName.c_str(), newPartCount); } if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } ErrorCode SwiftAdminAdapter::updateWriterVersion(const protocol::TopicWriterVersionInfo &writerVersionInfo) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_adminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } ec = doUpdateWriterVersion(writerVersionInfo); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } ErrorCode SwiftAdminAdapter::doUpdateWriterVersion(const protocol::TopicWriterVersionInfo &writerVersionInfo) { ErrorCode ec = ERROR_NONE; UpdateWriterVersionRequest request; UpdateWriterVersionResponse response; *(request.mutable_topicwriterversion()) = writerVersionInfo; addRequestExtendInfo(request); DO_ADMIN_COMMON_RPC(_adminClient, updateWriterVersion, response); return ec; } void SwiftAdminAdapter::setLatelyErrorTimeMinIntervalUs(int64_t interval) { _latelyErrorTimeMinInterval = interval; } void SwiftAdminAdapter::setLatelyErrorTimeMaxIntervalUs(int64_t interval) { _latelyErrorTimeMaxInterval = interval; } void SwiftAdminAdapter::setAuthenticationConf(const auth::ClientAuthorizerInfo &conf) { _authConf = conf; } auth::ClientAuthorizerInfo SwiftAdminAdapter::getAuthenticationConf() { return _authConf; } protocol::ErrorCode SwiftAdminAdapter::modifyTopic(protocol::TopicCreationRequest &request, protocol::TopicCreationResponse &response) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_adminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } addRequestExtendInfo(request); DO_ADMIN_COMMON_RPC(_adminClient, modifyTopic, response); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } protocol::ErrorCode SwiftAdminAdapter::getMasterInfo(protocol::EmptyRequest &request, protocol::MasterInfoResponse &response) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_adminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } addRequestExtendInfo(request); DO_ADMIN_COMMON_RPC(_adminClient, getMasterInfo, response); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } protocol::ErrorCode SwiftAdminAdapter::topicAclManage(protocol::TopicAclRequest &request, protocol::TopicAclResponse &response) { ScopedLock lock(_mutex); CHECK_LATELY_ERROR; if (!_adminClient) { ec = createAdminClient(); if (ec != ERROR_NONE) { AUTIL_LOG(WARN, "create admin client error!"); checkError(ec, now); return ec; } } addRequestExtendInfo(request); DO_ADMIN_COMMON_RPC(_adminClient, topicAclManage, response); if (needResetAdminClient(ec)) { resetAdminClient(); } checkError(ec, now); return ec; } } // namespace network } // namespace swift