aios/apps/facility/swift/config/AdminConfig.cpp (563 lines of code) (raw):
/*
* Copyright 2014-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "swift/config/AdminConfig.h"
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <ext/alloc_traits.h>
#include <memory>
#include <ostream>
#include "autil/StringUtil.h"
#include "autil/TimeUtility.h"
#include "fslib/fs/FileSystem.h"
#include "swift/config/AuthorizerParser.h"
#include "swift/config/ConfigDefine.h"
#include "swift/config/ConfigReader.h"
namespace swift {
namespace config {
AUTIL_LOG_SETUP(swift, AdminConfig);
using namespace std;
using namespace autil;
using namespace swift::config;
AdminConfig::AdminConfig()
: amonitorPort(DEFAULT_AMONITOR_PORT)
, brokerCount(0)
, adminCount(0)
, dispatchIntervalMs(1 * 1000)
, candidateWaitTimeMs(1000)
, brokerNetLimitMB(1000)
, cleanDataIntervalHour(1)
, delExpiredTopicIntervalSec(300)
, reserveDataHour(24 * 365)
, queueSize(DEFAULT_QUEUE_SIZE)
, threadNum(DEFAULT_THREAD_NUM)
, ioThreadNum(DEFAULT_IO_THREAD_NUM)
, anetTimeoutLoopIntervalMs(0)
, exclusiveListenThread(false)
, promoteIoThreadPriority(false)
, adminQueueSize(DEFAULT_QUEUE_SIZE)
, adminThreadNum(DEFAULT_THREAD_NUM)
, adminHttpQueueSize(DEFAULT_QUEUE_SIZE)
, adminHttpThreadNum(DEFAULT_THREAD_NUM)
, adminIoThreadNum(DEFAULT_IO_THREAD_NUM)
, decsionThreshold(DEFAULT_DECSION_THRESHOLD)
, leaderLeaseTime(DEFAULT_LEADER_LEASE_TIME)
, leaderLoopInterval(DEFAULT_LEADER_LOOP_INTERVAL)
, syncAdminInfoInterval(DEFAULT_SYNC_ADMIN_INFO_INTERVAL)
, syncTopicInfoInterval(DEFAULT_SYNC_TOPIC_INFO_INTERVAL)
, syncHeartbeatInterval(DEFAULT_SYNC_HEARTBEAT_INTERVAL)
, adjustResourceDuration(DEFAULT_ADJUST_RESOURCE_DURATION)
, exclusiveLevel(0)
, forceScheduleTimeSecond(-1)
, releaseDeadWorkerTimeSecond(DEFAULT_RELEASE_DEAD_WORKER_TIME)
, workerPartitionLimit(DEFAULT_WORKER_PARTITION_LIMIT)
, topicResourceLimit(DEFAULT_TOPIC_RESOURCE_LIMIT)
, minMaxPartitionBuffer(DEFAULT_MIN_MAX_PARTITION_BUFFER)
, brokerCfgTTlSec(DEFAULT_OBSOLETE_FILE_TIME_INTERVAL)
, recordLocalFile(-1)
, reserveBackMetaCount(200)
, requestProcessTimeRatio(DEFAULT_ADMIN_PROCESS_REQUEST_TIME_RATIO)
, noUseTopicExpireTimeSec(0)
, noUseTopicDeleteIntervalSec(0)
, missTopicRecoverIntervalSec(0)
, maxAllowNouseFileNum(10)
, updatePartResource(false)
, localMode(false)
, multiThreadDispatchTask(false)
, dispatchTaskThreadNum(10)
, heartbeatIntervalInUs(0)
, fastRecover(false)
, dealErrorBroker(false)
, errorBrokerDealRatio(DEFAULT_ADMIN_ERROR_BROKER_DEAL_RATIO)
, zfsTimeout(DEFAULT_ADMIN_ZFS_TIMEOUT)
, commitDelayThresholdInSec(DEFAULT_ADMIN_COMMIT_DELAY_THRESHOLD)
, brokerUnknownTimeout(DEFAULT_BROKER_UNKNOWN_TIMEOUT)
, deadBrokerTimeoutSec(180)
, forbidCampaignTimeMs(10000)
, useRecommendPort(true)
, chgPartcntIntervalUs(2000000)
, maxRestartCountInLocal(-1)
, backup(false)
, initialMasterVersion(0)
, forceSyncLeaderInfoInterval(DEFAULT_ADMIN_FORCE_SYNC_INTERVAL)
, longPeriodIntervalSec(3600 * 6)
, heartbeatThreadNum(DEFAULT_HEARTBEAT_THREAD_NUM)
, heartbeatQueueSize(DEFAULT_HEARTBEAT_QUEUE_SIZE)
, metaInfoReplicateInterval(DEFAULT_META_INFO_REPLICATOR_INTERVAL)
, maxTopicAclSyncIntervalUs(DEFAULT_MAX_TOPIC_ACL_SYNC_INTERVAL_US) {}
AdminConfig::~AdminConfig() {}
string AdminConfig::getApplicationId() const { return userName + "_" + serviceName; }
string AdminConfig::getUserName() const { return userName; }
string AdminConfig::getServiceName() const { return serviceName; }
string AdminConfig::getHippoRoot() const { return hippoRoot; }
string AdminConfig::getGatewayRoot() const { return gatewayRoot; }
string AdminConfig::getZkRoot() const { return zkRoot; }
uint16_t AdminConfig::getAmonitorPort() const { return amonitorPort; }
uint32_t AdminConfig::getBrokerCount() const { return brokerCount; }
uint32_t AdminConfig::getAdminCount() const { return adminCount; }
uint32_t AdminConfig::getDispatchIntervalMs() const { return dispatchIntervalMs; }
uint32_t AdminConfig::getCandidateWaitTimeMs() const { return candidateWaitTimeMs; };
uint32_t AdminConfig::getBrokerNetLimitMB() const { return brokerNetLimitMB; }
double AdminConfig::getCleanDataIntervalHour() const { return cleanDataIntervalHour; }
double AdminConfig::getDelExpiredTopicIntervalSec() const { return delExpiredTopicIntervalSec; }
double AdminConfig::getReserveDataByHour() const { return reserveDataHour; }
string AdminConfig::getDfsRoot() const { return dfsRoot; }
string AdminConfig::getExtendDfsRoot() const { return extendDfsRoot; }
string AdminConfig::getTodelDfsRoot() const { return todelDfsRoot; }
string AdminConfig::getConfigPath() const { return configPath; }
string AdminConfig::getBackMetaPath() const { return backMetaPath; }
int AdminConfig::getQueueSize() const { return queueSize; }
int AdminConfig::getThreadNum() const { return threadNum; }
int AdminConfig::getIoThreadNum() const { return ioThreadNum; }
int AdminConfig::getAdminHttpQueueSize() const { return adminHttpQueueSize; }
int AdminConfig::getAdminHttpThreadNum() const { return adminHttpThreadNum; }
int AdminConfig::getAdminQueueSize() const { return adminQueueSize; }
int AdminConfig::getAdminThreadNum() const { return adminThreadNum; }
int AdminConfig::getAdminIoThreadNum() const { return adminIoThreadNum; }
int AdminConfig::getAnetTimeoutLoopIntervalMs() const { return anetTimeoutLoopIntervalMs; }
bool AdminConfig::getExclusiveListenThread() const { return exclusiveListenThread; }
bool AdminConfig::getPromoteIoThreadPriority() const { return promoteIoThreadPriority; }
string AdminConfig::getConfigVersion() const { return configVersion; }
double AdminConfig::getDecsionThreshold() const { return decsionThreshold; }
void AdminConfig::setZkRoot(const string &zkRoot_) { zkRoot = zkRoot_; }
void AdminConfig::setWorkDir(const std::string &workDir_) { workDir = workDir_; }
const std::string &AdminConfig::getWorkDir() { return workDir; }
void AdminConfig::setLogConfFile(const std::string &logConfFile_) { logConfFile = logConfFile_; }
const std::string &AdminConfig::getLogConfFile() { return logConfFile; }
int32_t AdminConfig::getLeaderLeaseTime() const { return leaderLeaseTime; }
double AdminConfig::getLeaderLoopInterval() const { return leaderLoopInterval; }
const vector<string> &AdminConfig::getSyncAdminInfoPath() { return syncAdminInfoPathVec; }
int64_t AdminConfig::getSyncAdminInfoInterval() { return syncAdminInfoInterval; }
int64_t AdminConfig::getSyncTopicInfoInterval() { return syncTopicInfoInterval; }
int64_t AdminConfig::getSyncHeartbeatInterval() { return syncHeartbeatInterval; }
int64_t AdminConfig::getAdjustResourceDuration() { return adjustResourceDuration; }
const map<string, uint32_t> &AdminConfig::getGroupBrokerCountMap() { return groupBrokerCountMap; }
const map<string, uint32_t> &AdminConfig::getGroupNetPriorityMap() const { return groupNetPriorityMap; }
BrokerExclusiveLevel AdminConfig::getExclusiveLevel() const {
if (exclusiveLevel >= BROKER_EL_UNKNOWN) {
return BROKER_EL_ALL;
} else {
return (BrokerExclusiveLevel)exclusiveLevel;
}
}
int64_t AdminConfig::getForceScheduleTimeSecond() const { return forceScheduleTimeSecond; }
int AdminConfig::getRecordLocalFile() const { return recordLocalFile; }
uint32_t AdminConfig::getReserveBackMetaCount() const { return reserveBackMetaCount; }
int64_t AdminConfig::getReleaseDeadWorkerTimeSecond() const { return releaseDeadWorkerTimeSecond; }
int64_t AdminConfig::getWorkPartitionLimit() const { return workerPartitionLimit; }
int64_t AdminConfig::getTopicResourceLimit() const { return topicResourceLimit; }
int64_t AdminConfig::getMinMaxPartitionBufferSizeMb() const { return minMaxPartitionBuffer; }
int64_t AdminConfig::getBrokerCfgTTlSec() const { return brokerCfgTTlSec; }
uint32_t AdminConfig::getRequestProcessTimeRatio() const { return requestProcessTimeRatio; }
uint32_t AdminConfig::getNoUseTopicExpireTimeSec() const { return noUseTopicExpireTimeSec; }
uint32_t AdminConfig::getNoUseTopicDeleteIntervalSec() const { return noUseTopicDeleteIntervalSec; }
uint32_t AdminConfig::getMissTopicRecoverIntervalSec() const { return missTopicRecoverIntervalSec; }
uint32_t AdminConfig::getMaxAllowNouseFileNum() const { return maxAllowNouseFileNum; }
bool AdminConfig::autoUpdatePartResource() const { return updatePartResource; }
bool AdminConfig::isLocalMode() const { return localMode; }
bool AdminConfig::enableMultiThreadDispatchTask() const { return multiThreadDispatchTask; }
uint32_t AdminConfig::getDispatchTaskThreadNum() const { return dispatchTaskThreadNum; }
bool AdminConfig::enableFastRecover() const { return fastRecover; }
uint32_t AdminConfig::getHeartbeatIntervalInUs() const { return heartbeatIntervalInUs; }
string AdminConfig::getTopicScdType() const { return topicScdType; }
const std::vector<std::pair<std::string, std::string>> &AdminConfig::getTopicGroupVec() { return topicGroupVec; }
const vector<pair<string, vector<string>>> &AdminConfig::getTopicOwnerVec() { return topicOwnerVec; }
const map<string, uint32_t> &AdminConfig::getVeticalGroupBrokerCountMap() { return veticalGroupBrokerCountMap; }
bool AdminConfig::getDealErrorBroker() const { return dealErrorBroker; }
double AdminConfig::getErrorBrokerDealRatio() const { return errorBrokerDealRatio; }
string AdminConfig::getReportZombieUrl() const { return reportZombieUrl; }
int64_t AdminConfig::getZfsTimeout() const { return zfsTimeout; }
int64_t AdminConfig::getCommitDelayThreshold() const { return commitDelayThresholdInSec; }
int64_t AdminConfig::getBrokerUnknownTimeout() const { return brokerUnknownTimeout; }
uint32_t AdminConfig::getDeadBrokerTimeoutSec() const { return deadBrokerTimeoutSec; }
uint64_t AdminConfig::getForbidCampaignTimeMs() const { return forbidCampaignTimeMs; }
bool AdminConfig::getUseRecommendPort() const { return useRecommendPort; }
uint64_t AdminConfig::getChgPartcntIntervalUs() const { return chgPartcntIntervalUs; }
int32_t AdminConfig::getMaxRestartCountInLocal() const { return maxRestartCountInLocal; }
bool AdminConfig::enableBackup() const { return backup; }
uint64_t AdminConfig::getInitialMasterVersion() const { return initialMasterVersion; }
int64_t AdminConfig::getForceSyncLeaderInfoInterval() const { return forceSyncLeaderInfoInterval; }
string AdminConfig::getMirrorZkRoot() const { return mirrorZkRoot; }
const AuthorizerInfo &AdminConfig::getAuthenticationConf() const { return authConf; }
int64_t AdminConfig::getLongPeriodIntervalSec() const { return longPeriodIntervalSec; }
uint32_t AdminConfig::getHeartbeatThreadNum() const { return heartbeatThreadNum; }
uint32_t AdminConfig::getHeartbeatQueueSize() const { return heartbeatQueueSize; }
const vector<string> &AdminConfig::getCleanAtDeleteTopicPatterns() const { return cleanAtDeleteTopicPatterns; }
int64_t AdminConfig::getMetaInfoReplicateInterval() const { return metaInfoReplicateInterval; }
int64_t AdminConfig::getMaxTopicAclSyncIntervalUs() const { return maxTopicAclSyncIntervalUs; }
string AdminConfig::parseConfigVersion(const string &configPath) {
vector<string> configStrVec = StringUtil::split(configPath, "/");
if (configStrVec.size() != 0) {
return configStrVec[configStrVec.size() - 1];
} else {
AUTIL_LOG(WARN, "config path is empty, can't get config version");
return "";
}
}
AdminConfig *AdminConfig::loadConfig(const string &configPath) {
string configFile = fslib::fs::FileSystem::joinFilePath(configPath, SWIFT_CONF_FILE_NAME);
ConfigReader reader;
if (!reader.read(configFile)) {
AUTIL_LOG(ERROR, "Failed to read %s.", configFile.c_str());
return NULL;
}
if (!reader.hasSection(SECTION_COMMON)) {
AUTIL_LOG(ERROR, "Failed to find section[%s].", SECTION_COMMON);
return NULL;
}
if (!reader.hasSection(SECTION_ADMIN)) {
AUTIL_LOG(ERROR, "Failed to find section[%s].", SECTION_ADMIN);
return NULL;
}
if (!reader.hasSection(SECTION_BROKER)) {
AUTIL_LOG(ERROR, "Failed to find section[%s].", SECTION_BROKER);
return NULL;
}
unique_ptr<AdminConfig> adminConfig(new AdminConfig);
adminConfig->configPath = configPath;
adminConfig->configVersion = parseConfigVersion(configPath);
#define GET_ADMIN_CONFIG_VALUE(type, name, value, need, sectionName) \
{ \
type v; \
if (reader.getOption(sectionName, name, v)) { \
adminConfig->value = v; \
} else if (need) { \
AUTIL_LOG(ERROR, "Invalid config: [%s] is needed", name); \
return NULL; \
} \
}
GET_ADMIN_CONFIG_VALUE(uint16_t, AMONITOR_PORT, amonitorPort, false, SECTION_COMMON);
GET_ADMIN_CONFIG_VALUE(string, USER_NAME, userName, true, SECTION_COMMON);
GET_ADMIN_CONFIG_VALUE(string, SERVICE_NAME, serviceName, true, SECTION_COMMON);
GET_ADMIN_CONFIG_VALUE(string, HIPPO_ROOT_PATH, hippoRoot, true, SECTION_COMMON);
GET_ADMIN_CONFIG_VALUE(string, GATEWAY_ROOT_PATH, gatewayRoot, false, SECTION_COMMON);
GET_ADMIN_CONFIG_VALUE(string, ZOOKEEPER_ROOT_PATH, zkRoot, true, SECTION_COMMON);
GET_ADMIN_CONFIG_VALUE(uint32_t, BROKER_COUNT, brokerCount, true, SECTION_COMMON);
GET_ADMIN_CONFIG_VALUE(uint32_t, ADMIN_COUNT, adminCount, true, SECTION_COMMON);
GET_ADMIN_CONFIG_VALUE(uint32_t, BROKER_EXCLUSIVE_LEVEL, exclusiveLevel, false, SECTION_COMMON);
GET_ADMIN_CONFIG_VALUE(int32_t, LEADER_LEASE_TIME, leaderLeaseTime, false, SECTION_COMMON);
GET_ADMIN_CONFIG_VALUE(double, LEADER_LOOP_INTERVAL, leaderLoopInterval, false, SECTION_COMMON);
GET_ADMIN_CONFIG_VALUE(bool, USE_RECOMMEND_PORT, useRecommendPort, false, SECTION_COMMON);
GET_ADMIN_CONFIG_VALUE(bool, IS_LOCAL_MODE, localMode, false, SECTION_COMMON);
GET_ADMIN_CONFIG_VALUE(bool, ENABLE_BACKUP, backup, false, SECTION_COMMON);
GET_ADMIN_CONFIG_VALUE(uint64_t, INITIAL_MASTER_VERSION, initialMasterVersion, false, SECTION_COMMON);
GET_ADMIN_CONFIG_VALUE(string, MIRROR_ZKROOT, mirrorZkRoot, false, SECTION_COMMON);
GET_ADMIN_CONFIG_VALUE(int64_t, MAX_TOPIC_ACL_SYNC_INTERVAL_US, maxTopicAclSyncIntervalUs, false, SECTION_COMMON);
GET_ADMIN_CONFIG_VALUE(uint32_t, DISPATCH_INTERVAL, dispatchIntervalMs, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(uint32_t, CANDIDATE_WAIT_TIME_MS, candidateWaitTimeMs, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(uint32_t, BROKER_NETLIMIT_MB, brokerNetLimitMB, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(double, CLEAN_DATA_INTERVAL_HOUR, cleanDataIntervalHour, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(
double, DELETE_EXPIRED_TOPIC_INTERVAL_SECOND, delExpiredTopicIntervalSec, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(double, RESERVE_DATA_HOUR, reserveDataHour, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(double, DECSION_THRESHOLD, decsionThreshold, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int64_t, ADJUST_RESOURCE_DURATION_MS, adjustResourceDuration, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int64_t, FORCE_SCHEDULE_TIME, forceScheduleTimeSecond, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int64_t, RELEASE_DEAD_WORKER_TIME, releaseDeadWorkerTimeSecond, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int64_t, TOPIC_RESOURCE_LIMIT, topicResourceLimit, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int64_t, WORKER_PARTITION_LIMIT, workerPartitionLimit, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int64_t, SYNC_ADMIN_INFO_INTERVAL, syncAdminInfoInterval, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int64_t, SYNC_TOPIC_INFO_INTERVAL, syncTopicInfoInterval, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int64_t, SYNC_HEARTBEAT_INTERVAL, syncHeartbeatInterval, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int32_t, MAX_RESTART_COUNT_IN_LOCAL, maxRestartCountInLocal, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int, ADMIN_THREAD_QUEUE_SIZE, adminQueueSize, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int, ADMIN_THREAD_NUM, adminThreadNum, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int, ADMIN_HTTP_THREAD_QUEUE_SIZE, adminHttpQueueSize, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int, ADMIN_HTTP_THREAD_NUM, adminHttpThreadNum, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int64_t, MIN_MAX_PARTITION_BUFFER_SIZE_LIMIT, minMaxPartitionBuffer, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int64_t, FORCE_SYNC_LEADER_INFO_INTERVAL, forceSyncLeaderInfoInterval, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int64_t, OBSOLETE_FILE_TIME_INTERVAL_HOUR, brokerCfgTTlSec, false, SECTION_BROKER);
if (DEFAULT_OBSOLETE_FILE_TIME_INTERVAL != adminConfig->brokerCfgTTlSec) {
adminConfig->brokerCfgTTlSec *= 3600;
}
GET_ADMIN_CONFIG_VALUE(int64_t, OBSOLETE_FILE_TIME_INTERVAL_SEC, brokerCfgTTlSec, false, SECTION_BROKER);
GET_ADMIN_CONFIG_VALUE(uint32_t, ADMIN_REQUEST_PROCESS_TIME_RATIO, requestProcessTimeRatio, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int, ADMIN_IO_THREAD_NUM, adminIoThreadNum, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int, RECORD_LOCAL_FILE, recordLocalFile, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(uint32_t, RESERVE_BACK_META_COUNT, reserveBackMetaCount, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(string, BACKUP_META_PATH, backMetaPath, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(bool, AUTO_UPDATE_PART_RESOURCE, updatePartResource, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(bool, ENABLE_MULTI_TRHEAD_DISPATCH_TASK, multiThreadDispatchTask, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(uint32_t, DISPATCH_TASK_THREAD_NUM, dispatchTaskThreadNum, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(uint32_t, NOUSE_TOPIC_EXPIRE_TIME_SEC, noUseTopicExpireTimeSec, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(
uint32_t, NOUSE_TOPIC_DELETE_INTERVAL_SEC, noUseTopicDeleteIntervalSec, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(
uint32_t, MISS_TOPIC_RECOVER_INTERVAL_SEC, missTopicRecoverIntervalSec, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(uint32_t, MAX_ALLOW_NOUSE_FILE_NUM, maxAllowNouseFileNum, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(string, TOPIC_SCHEDULE_TYPE, topicScdType, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(bool, DEAL_ERROR_BROKER, dealErrorBroker, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(double, ERROR_BROKER_DEAL_RATIO, errorBrokerDealRatio, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(string, REPORT_ZOMBIE_URL, reportZombieUrl, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int64_t, CS_ZFS_TIMEOUT, zfsTimeout, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int64_t, CS_COMMIT_DELAY_THRESHOLD_SEC, commitDelayThresholdInSec, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int64_t, BROKER_UNKNOWN_TIMEOUT, brokerUnknownTimeout, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(uint32_t, DEAD_BROKER_TIMEOUT_SEC, deadBrokerTimeoutSec, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(uint64_t, FORBID_CAMPAIGN_TIME_MS, forbidCampaignTimeMs, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(uint64_t, CHG_TOPIC_PARTCNT_INTERVAL_US, chgPartcntIntervalUs, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int64_t, LONG_PERIOD_INTERVAL_SEC, longPeriodIntervalSec, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(uint32_t, HEARTBEAT_THREAD_NUM, heartbeatThreadNum, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(uint32_t, HEARTBEAT_QUEUE_SIZE, heartbeatQueueSize, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(int64_t, META_INFO_REPLICATE_INTERVAL, metaInfoReplicateInterval, false, SECTION_ADMIN);
GET_ADMIN_CONFIG_VALUE(string, DFS_ROOT_PATH, dfsRoot, true, SECTION_BROKER);
GET_ADMIN_CONFIG_VALUE(string, EXTEND_DFS_ROOT_PATH, extendDfsRoot, false, SECTION_BROKER);
GET_ADMIN_CONFIG_VALUE(string, TODEL_DFS_ROOT_PATH, todelDfsRoot, false, SECTION_BROKER);
GET_ADMIN_CONFIG_VALUE(int, THREAD_QUEUE_SIZE, queueSize, false, SECTION_BROKER);
GET_ADMIN_CONFIG_VALUE(int, THREAD_NUM, threadNum, false, SECTION_BROKER);
GET_ADMIN_CONFIG_VALUE(int, IO_THREAD_NUM, ioThreadNum, false, SECTION_BROKER);
GET_ADMIN_CONFIG_VALUE(int, ANET_TIMEOUT_LOOP_INTERVAL_MS, anetTimeoutLoopIntervalMs, false, SECTION_BROKER);
GET_ADMIN_CONFIG_VALUE(bool, EXCLUSIVE_LISTEN_THREAD, exclusiveListenThread, false, SECTION_BROKER);
GET_ADMIN_CONFIG_VALUE(bool, PROMOTE_IO_THREAD_PRIORITY, promoteIoThreadPriority, false, SECTION_BROKER);
GET_ADMIN_CONFIG_VALUE(bool, ENABLE_FAST_RECOVER, fastRecover, false, SECTION_BROKER);
GET_ADMIN_CONFIG_VALUE(int, HEARTBEAT_INTERVAL_IN_US, heartbeatIntervalInUs, false, SECTION_BROKER);
string CADTopicPatterns;
if (reader.getOption(SECTION_ADMIN, CLEAN_AT_DELETE_TOPIC_PATTERNS, CADTopicPatterns)) {
adminConfig->cleanAtDeleteTopicPatterns = StringUtil::split(CADTopicPatterns, ";");
}
string syncPath;
if (reader.getOption(SECTION_ADMIN, SYNC_ADMIN_INFO_PATH, syncPath)) {
adminConfig->syncAdminInfoPathVec = StringUtil::split(syncPath, ";");
}
string groupInfo;
if (reader.getOption(SECTION_COMMON, GROUP_BROKER_COUNT, groupInfo)) {
const vector<string> &groupVec = StringUtil::split(groupInfo, ";");
for (size_t i = 0; i < groupVec.size(); i++) {
const vector<string> &infoVec = StringUtil::split(groupVec[i], ":");
if (infoVec.size() == 2) {
auto count = StringUtil::fromString<uint32_t>(infoVec[1]);
adminConfig->groupBrokerCountMap[infoVec[0]] = count;
adminConfig->veticalGroupBrokerCountMap[infoVec[0]] = count;
}
}
}
string groupNetInfo;
if (reader.getOption(SECTION_COMMON, GROUP_NET_PRIORITY, groupNetInfo)) {
const vector<string> &groupNetVec = StringUtil::split(groupNetInfo, ";");
for (const auto &netInfo : groupNetVec) {
vector<string> infoVec = StringUtil::split(netInfo, ":");
if (infoVec.size() != 2) {
AUTIL_LOG(WARN, "parse group net priority failed [%s].", netInfo.c_str());
continue;
}
uint32_t value;
if (!StringUtil::fromString<uint32_t>(infoVec[1], value)) {
AUTIL_LOG(WARN, "parse group net priority failed [%s].", infoVec[1].c_str());
continue;
}
adminConfig->groupNetPriorityMap[std::move(infoVec[0])] = value;
}
}
string groupTopicInfo;
if (reader.getOption(SECTION_ADMIN, TOPIC_GROUP_VEC, groupTopicInfo)) {
const vector<string> &groupVec = StringUtil::split(groupTopicInfo, ";");
for (size_t i = 0; i < groupVec.size(); i++) {
const vector<string> &infoVec = StringUtil::split(groupVec[i], ":");
if (infoVec.size() == 2) {
adminConfig->topicGroupVec.emplace_back(infoVec[0], infoVec[1]);
}
}
}
string topicOwnerInfo;
if (reader.getOption(SECTION_ADMIN, TOPIC_OWNER_VEC, topicOwnerInfo)) {
const vector<string> &ownerVec = StringUtil::split(topicOwnerInfo, ";");
for (size_t i = 0; i < ownerVec.size(); i++) {
const vector<string> &infoVec = StringUtil::split(ownerVec[i], ":");
if (infoVec.size() == 2) {
const vector<string> &owners = StringUtil::split(infoVec[1], ",");
adminConfig->topicOwnerVec.emplace_back(infoVec[0], owners);
}
}
}
adminConfig->groupBrokerCountMap[DEFAULT_GROUP_NAME] = adminConfig->brokerCount;
adminConfig->veticalGroupBrokerCountMap[DEFAULT_GROUP_NAME] = adminConfig->brokerCount;
adminConfig->brokerCount = 0;
map<string, uint32_t>::iterator iter = adminConfig->groupBrokerCountMap.begin();
for (; iter != adminConfig->groupBrokerCountMap.end(); iter++) {
adminConfig->brokerCount += iter->second;
}
string veticalBrokerCntInfo;
if (reader.getOption(SECTION_ADMIN, VETICAL_BROKERCNT_VEC, veticalBrokerCntInfo)) {
const vector<string> &vbrokerVec = StringUtil::split(veticalBrokerCntInfo, ";");
for (size_t i = 0; i < vbrokerVec.size(); i++) {
const vector<string> &cntVec = StringUtil::split(vbrokerVec[i], ":");
uint32_t count = 0;
if (cntVec.size() == 2 && StringUtil::strToUInt32(cntVec[1].c_str(), count)) {
adminConfig->veticalGroupBrokerCountMap[cntVec[0]] = count;
}
}
}
AuthorizerParser authorizerParser;
if (!authorizerParser.parseAuthorizer(reader, adminConfig->authConf)) {
AUTIL_LOG(ERROR, "lack of necessary internal authentication");
return NULL;
}
if (!adminConfig->validate()) {
AUTIL_LOG(WARN, "swift config validate failed!");
return NULL;
}
return adminConfig.release();
}
bool AdminConfig::validate() const {
if (userName.empty()) {
AUTIL_LOG(ERROR, "UserName can't be empty.");
return false;
}
if (serviceName.empty()) {
AUTIL_LOG(ERROR, "ServiceName can't be empty.");
return false;
}
if (!localMode && hippoRoot.empty()) {
AUTIL_LOG(ERROR, "HippoRoot can't be empty.");
return false;
}
if (zkRoot.empty()) {
AUTIL_LOG(ERROR, "Zookeeper can't be empty.");
return false;
}
if (brokerCount <= 0) {
AUTIL_LOG(ERROR, "Broker count must greater than 0.");
return false;
}
if (adminCount <= 0) {
AUTIL_LOG(ERROR, "Admin count must greater than 0.");
return false;
}
if (decsionThreshold > 1) {
AUTIL_LOG(ERROR, "Decsion threshold can't greater than 1.");
return false;
}
if (!dfsRoot.empty() && dfsRoot == backMetaPath) {
AUTIL_LOG(ERROR, "backMetaPath should not same as dfsRoot");
return false;
}
if (reserveBackMetaCount < 10) {
AUTIL_LOG(ERROR, "reserveBackMetaCount should bigger than 10");
return false;
}
if (requestProcessTimeRatio < 10 || requestProcessTimeRatio > 100) {
AUTIL_LOG(ERROR, "requestProcessTimeRatio should between 10 and 100");
return false;
}
if (brokerNetLimitMB < 10) {
AUTIL_LOG(ERROR, "brokerNetLimitMB should larger than 10");
return false;
}
if ("vetical" == topicScdType) {
for (const auto &group : groupBrokerCountMap) {
const auto iter = veticalGroupBrokerCountMap.find(group.first);
if (iter != veticalGroupBrokerCountMap.end() && (iter->second > group.second || 0 == iter->second)) {
AUTIL_LOG(ERROR,
"group[%s] vetical broker count[%u] invalid, "
"should > 0 and <= total[%u]",
group.first.c_str(),
iter->second,
group.second);
return false;
}
}
}
if (backup) {
if (!fastRecover) {
AUTIL_LOG(ERROR, "fast recover should enabled when enable backup");
return false;
}
if (mirrorZkRoot.empty()) {
AUTIL_LOG(ERROR, "mirror zk root should config when enable backup");
return false;
}
if (1 != syncAdminInfoPathVec.size()) {
AUTIL_LOG(ERROR, "sync admin info should config only one when enable backup");
return false;
}
}
if (cleanAtDeleteTopicPatterns.size() != 0) {
for (int i = 0; i < cleanAtDeleteTopicPatterns.size(); i++) {
if (cleanAtDeleteTopicPatterns[i] == "") {
AUTIL_LOG(ERROR, "clean at delete topic pattern should not be empty");
return false;
}
}
}
return true;
}
std::string AdminConfig::getConfigStr() {
std::ostringstream oss;
oss << boolalpha << "userName:" << userName << " serviceName:" << serviceName << " hippoRoot:" << hippoRoot
<< " zkRoot:" << zkRoot << " configVersion:" << configVersion << " amonitorPort:" << amonitorPort
<< " brokerCount:" << brokerCount << " adminCount:" << adminCount
<< " dispatchIntervalMs:" << dispatchIntervalMs << " candidateWaitTimeMs:" << candidateWaitTimeMs
<< " brokerNetLimitMB:" << brokerNetLimitMB << " dfsRoot:" << dfsRoot << " extendDfsRoot:" << extendDfsRoot
<< " todelDfsRoot:" << todelDfsRoot << " configPath:" << configPath << " backMetaPath:" << backMetaPath
<< " cleanDataIntervalHour:" << cleanDataIntervalHour
<< " delExpiredTopicIntervalSec:" << delExpiredTopicIntervalSec << " reserveDataHour:" << reserveDataHour
<< " queueSize:" << queueSize << " threadNum:" << threadNum << " ioThreadNum:" << ioThreadNum
<< " anetTimeoutLoopIntervalMs:" << anetTimeoutLoopIntervalMs << " adminQueueSize:" << adminQueueSize
<< " adminThreadNum:" << adminThreadNum << " adminHttpQueueSize:" << adminHttpQueueSize
<< " adminHttpThreadNum:" << adminHttpThreadNum << " adminIoThreadNum:" << adminIoThreadNum
<< " decsionThreshold:" << decsionThreshold << " leaderLeaseTime:" << leaderLeaseTime
<< " leaderLoopInterval:" << leaderLoopInterval << " syncAdminInfoPathVec:[";
for (auto &s : syncAdminInfoPathVec) {
oss << s << " ";
}
oss << "] syncAdminInfoInterval:" << syncAdminInfoInterval << " syncTopicInfoInterval:" << syncTopicInfoInterval
<< " syncHeartbeatInterval:" << syncHeartbeatInterval << " groupBrokerCountMap:[";
for (auto &item : groupBrokerCountMap) {
oss << item.first << ":" << item.second << " ";
}
oss << "] veticalGroupBrokerCountMap:[";
for (auto &item : veticalGroupBrokerCountMap) {
oss << item.first << ":" << item.second << " ";
}
oss << "] groupNetPriorityMap:[";
for (auto &item : groupNetPriorityMap) {
oss << item.first << ":" << item.second << " ";
}
oss << "] adjustResourceDuration:" << adjustResourceDuration << " exclusiveLevel:" << exclusiveLevel
<< " forceScheduleTimeSecond:" << forceScheduleTimeSecond
<< " releaseDeadWorkerTimeSecond:" << releaseDeadWorkerTimeSecond
<< " workerPartitionLimit:" << workerPartitionLimit << " topicResourceLimit:" << topicResourceLimit
<< " minMaxPartitionBuffer:" << minMaxPartitionBuffer << " brokerCfgTTlSec:" << brokerCfgTTlSec
<< " recordLocalFile:" << recordLocalFile << " reserveBackMetaCount:" << reserveBackMetaCount
<< " requestProcessTimeRatio:" << requestProcessTimeRatio
<< " noUseTopicExpireTimeSec:" << noUseTopicExpireTimeSec
<< " noUseTopicDeleteIntervalSec:" << noUseTopicDeleteIntervalSec
<< " missTopicRecoverIntervalSec:" << missTopicRecoverIntervalSec
<< " maxAllowNouseFileNum:" << maxAllowNouseFileNum << " autoUpdatePartResource:" << updatePartResource
<< " localMode:" << localMode << " workDir:" << workDir << " logConfFile:" << logConfFile
<< " multiThreadDispatchTask:" << multiThreadDispatchTask << " dispatchTaskThreadNum:" << dispatchTaskThreadNum
<< " heartbeatIntervalInUs:" << heartbeatIntervalInUs << " topicScdType:" << topicScdType
<< " dealErrorBroker:" << dealErrorBroker << " errorBrokerDealRatio:" << errorBrokerDealRatio
<< " reportZombieUrl:" << reportZombieUrl << " zfsTimeout:" << zfsTimeout
<< " commitDelayThreshold:" << commitDelayThresholdInSec << " brokerUnknownTimeout:" << brokerUnknownTimeout
<< " deadBrokerTimeoutSec:" << deadBrokerTimeoutSec << " forbidCampaignTimeMs:" << forbidCampaignTimeMs
<< " useRecommendPort:" << useRecommendPort << " chgPartcntIntervalUs:" << chgPartcntIntervalUs
<< " maxRestartCountInLocal:" << maxRestartCountInLocal << " backup:" << backup
<< " initialMasterVersion:" << initialMasterVersion
<< " forceSyncLeaderInfoInterval:" << forceSyncLeaderInfoInterval << " mirrorZkRoot:" << mirrorZkRoot
<< " longPeriodIntervalSec:" << longPeriodIntervalSec << " heartbeatThreadNum:" << heartbeatThreadNum
<< " heartbeatQueueSize:" << heartbeatQueueSize << " maxTopicAclSyncIntervalUs:" << maxTopicAclSyncIntervalUs
<< " enableAuthentication:" << authConf.getEnable() << " authentications:[";
map<string, string> authentications = authConf.getSysUsers();
for (auto &item : authentications) {
oss << item.first << ":" << item.second << " ";
}
oss << "] topicGroupVec:[";
for (auto &item : topicGroupVec) {
oss << item.first << ":" << item.second << " ";
}
oss << "] topicOwnerVec:[";
for (auto &item : topicOwnerVec) {
oss << item.first << ":(";
for (auto &owner : item.second) {
oss << owner << " ";
}
oss << ") ";
}
oss << "] cleanAtDeleteTopicPatterns:[";
for (auto &item : cleanAtDeleteTopicPatterns) {
oss << item << " ";
}
oss << "]";
oss << " metaInfoReplicateInterval:" << metaInfoReplicateInterval;
return oss.str();
}
} // namespace config
} // namespace swift