aios/suez/worker/EnvParam.cpp (269 lines of code) (raw):
/*
* Copyright 2014-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "suez/worker/EnvParam.h"
#include "autil/EnvUtil.h"
#include "autil/StringUtil.h"
#include "suez/sdk/CmdLineDefine.h"
using namespace std;
using namespace autil;
namespace suez {
AUTIL_DECLARE_AND_SETUP_LOGGER(suez, EnvParam);
static const string KMONITOR_KEYVALUE_SEP = "^";
static const string KMONITOR_MULTI_SEP = "@";
static const string ASYNC_INTER_EXECUTOR_THREAD_NUM = "asyncInterExecutorThreadNum";
static const string ASYNC_INTER_EXECUTOR_TYPE = "asyncInterExecutorType";
static const string ASYNC_INTRA_EXECUTOR_THREAD_NUM = "asyncIntraExecutorThreadNum";
static const string ASYNC_INTRA_EXECUTOR_TYPE = "asyncIntraExecutorType";
EnvParam::EnvParam()
: httpThreadNum(8)
, httpQueueSize(10000)
, dpThreadNum(10)
, loadThreadNum(10)
, httpIoThreadNum(5)
, decisionLoopInterval(1 * 1000 * 1000)
, waitForDebugger(false)
, decodeUri(false)
, haCompatible(false)
, enableSapSearch(false)
, allowPartialTableReady(false)
, needShutdownGracefully(false)
, noDiskQuotaCheck(false)
, reportLoopInterval(1 * 1000 * 1000)
, discardMetricSampleInterval(60)
, kmonitorEnableLogFileSink(false)
, kmonitorEnablePrometheusSink(false)
, kmonitorManuallyMode(false)
, kmonitorNormalSamplePeriod(-1)
, asyncInterExecutorThreadNum(1)
, asyncIntraExecutorThreadNum(1)
, debugMode(false)
, localMode(false)
, rdmaRpcThreadNum(8)
, rdmaRpcQueueSize(1000)
, rdmaIoThreadNum(2)
, rdmaWorkerThreadNum(5)
, gigGrpcThreadNum(10)
, gigEnableAgentStat(true) {}
bool EnvParam::parseKMonitorTags(const string &tagsStr, map<string, string> &tagsMap) {
auto tagVec = StringUtil::split(tagsStr, KMONITOR_MULTI_SEP);
for (const auto &tags : tagVec) {
auto kvVec = StringUtil::split(tags, KMONITOR_KEYVALUE_SEP);
if (kvVec.size() != 2) {
AUTIL_LOG(WARN, "parse kmonitor tags [%s] failed.", tags.c_str());
return false;
}
StringUtil::trim(kvVec[0]);
StringUtil::trim(kvVec[1]);
tagsMap[kvVec[0]] = kvVec[1];
}
return true;
}
string EnvParam::tagsToString(const map<string, string> &tagsMap) {
vector<string> tagsVec;
for (auto &pair : tagsMap) {
tagsVec.emplace_back(pair.first + KMONITOR_KEYVALUE_SEP + pair.second);
}
return StringUtil::toString(tagsVec, KMONITOR_MULTI_SEP);
}
bool EnvParam::tryExtractPosIntValueFromEnv(const string &envName, int &value) {
auto envStr = autil::EnvUtil::getEnv(envName, "");
if (!envStr.empty()) {
if (!StringUtil::fromString<int>(envStr, value) || value <= 0) {
AUTIL_LOG(ERROR, "invalid env %s [%s]", envName.c_str(), envStr.c_str());
return false;
}
}
return true;
}
bool EnvParam::init() {
httpPort = autil::EnvUtil::getEnv(HTTP_PORT, "");
if (!tryExtractPosIntValueFromEnv(HTTP_THREAD_NUM, httpThreadNum)) {
return false;
}
if (!tryExtractPosIntValueFromEnv(HTTP_QUEUE_SIZE, httpQueueSize)) {
return false;
}
if (!tryExtractPosIntValueFromEnv(HTTP_IO_THREAD_NUM, httpIoThreadNum)) {
return false;
}
if (!tryExtractPosIntValueFromEnv(DP_THREAD_NUM, dpThreadNum)) {
return false;
}
if (!tryExtractPosIntValueFromEnv(LOAD_THREAD_NUM, loadThreadNum)) {
return false;
}
if (!tryExtractPosIntValueFromEnv(DECISION_LOOP_INTERVAL, decisionLoopInterval)) {
return false;
}
auto waitForDebuggerStr = autil::EnvUtil::getEnv(WAIT_FOR_DEBUGGER, "");
if (waitForDebuggerStr == "true") {
waitForDebugger = true;
}
auto decodeUriStr = autil::EnvUtil::getEnv(DECODE_URI, "");
if (decodeUriStr == "true") {
decodeUri = true;
}
auto haCompatibleStr = autil::EnvUtil::getEnv(HA_COMPATABLE, "");
if (haCompatibleStr == "true") {
haCompatible = true;
}
auto enableSapSearchStr = autil::EnvUtil::getEnv("enableSap", "");
if (!enableSapSearchStr.empty()) {
enableSapSearch = true;
}
auto allowPartialTableReadyStr = autil::EnvUtil::getEnv(ALLOW_PARTIAL_TABLE_READY, "");
if (allowPartialTableReadyStr == "true") {
allowPartialTableReady = true;
}
auto needShutdownGracefullyStr = autil::EnvUtil::getEnv(NEED_SHUTDOWN_GRACEFULLY, "");
if (needShutdownGracefullyStr == "true") {
needShutdownGracefully = true;
}
auto noDiskQuotaCheckStr = autil::EnvUtil::getEnv(NO_DISK_QUOTA_CHECK, "");
if (noDiskQuotaCheckStr == "true") {
noDiskQuotaCheck = true;
}
if (!tryExtractPosIntValueFromEnv(REPORT_INDEX_STATUS_LOOP_INTERVAL, reportLoopInterval)) {
return false;
}
if (!tryExtractPosIntValueFromEnv(DISCARD_METRIC_SAMPLE_INTERVAL, discardMetricSampleInterval)) {
return false;
}
serviceName = autil::EnvUtil::getEnv(SERVICE_NAME, "suez_service");
amonitorPort = autil::EnvUtil::getEnv(AMONITOR_PORT, "10086");
amonitorPath = autil::EnvUtil::getEnv(AMONITOR_PATH, "");
amonitorPathStyle = autil::EnvUtil::getEnv(AMONITOR_PATH_STYLE, "");
roleType = autil::EnvUtil::getEnv(ROLE_TYPE, "");
roleName = autil::EnvUtil::getEnv(ROLE_NAME, "");
zoneName = autil::EnvUtil::getEnv(ZONE_NAME, "");
partId = autil::EnvUtil::getEnv(PART_ID, "");
hippoSlaveIp = autil::EnvUtil::getEnv(HIPPO_SLAVE_IP, "127.0.0.1");
hippoDp2SlavePort = autil::EnvUtil::getEnv(HIPPO_DP2_SLAVE_PORT, "");
carbonIdentifier = autil::EnvUtil::getEnv(WORKER_IDENTIFIER_FOR_CARBON, "");
procInstanceId = autil::EnvUtil::getEnv(HIPPO_PROC_INSTANCEID, "");
packageCheckSum = autil::EnvUtil::getEnv(CARBON_PACKAGE_CHECKSUM, "");
/***
for kmon
***/
kmonitorPort = autil::EnvUtil::getEnv(KMONITOR_PORT, "4141");
kmonitorServiceName = autil::EnvUtil::getEnv(KMONITOR_SERVICE_NAME, "");
kmonitorSinkAddress = autil::EnvUtil::getEnv(KMONITOR_SINK_ADDRESS, hippoSlaveIp);
kmonitorEnableLogFileSink = autil::EnvUtil::getEnv(KMONITOR_ENABLE_LOGFILE_SINK, kmonitorEnableLogFileSink);
kmonitorEnablePrometheusSink = autil::EnvUtil::getEnv(KMONITOR_ENABLE_PROM_SINK, kmonitorEnablePrometheusSink);
kmonitorManuallyMode = autil::EnvUtil::getEnv(KMONITOR_MANUALLY_MODE, kmonitorManuallyMode);
kmonitorTenant = autil::EnvUtil::getEnv(KMONITOR_TENANT, "default");
kmonitorMetricsPrefix = autil::EnvUtil::getEnv(KMONITOR_METRICS_PREFIX, "");
kmonitorGlobalTableMetricsPrefix = autil::EnvUtil::getEnv(KMONITOR_GLOBAL_TABLE_METRICS_PREFIX, "");
kmonitorTableMetricsPrefix = autil::EnvUtil::getEnv(KMONITOR_TABLE_METRICS_PREFIX, "");
kmonitorMetricsReporterCacheLimit = autil::EnvUtil::getEnv(KMONITOR_METRICS_REPORTER_CACHE_LIMIT, "");
string kmonitorTagsStr = autil::EnvUtil::getEnv(KMONITOR_TAGS, "");
if (!kmonitorTagsStr.empty() && !parseKMonitorTags(kmonitorTagsStr, kmonitorTags)) {
return false;
}
kmonitorNormalSamplePeriod = autil::EnvUtil::getEnv(KMONITOR_NORMAL_SAMPLE_PERIOD, -1);
if (!tryExtractPosIntValueFromEnv(ASYNC_INTER_EXECUTOR_THREAD_NUM, asyncInterExecutorThreadNum)) {
return false;
}
if (!tryExtractPosIntValueFromEnv(ASYNC_INTRA_EXECUTOR_THREAD_NUM, asyncIntraExecutorThreadNum)) {
return false;
}
asyncInterExecutorType = autil::EnvUtil::getEnv(ASYNC_INTER_EXECUTOR_TYPE, "hos");
asyncIntraExecutorType = autil::EnvUtil::getEnv(ASYNC_INTRA_EXECUTOR_TYPE, "hos");
debugMode = autil::EnvUtil::getEnv(DEBUG_MODE, false);
localMode = autil::EnvUtil::getEnv(LOCAL_MODE, false);
if (!initRdma()) {
return false;
}
if (!initGrpc()) {
return false;
}
AUTIL_LOG(INFO, "params after init: %s", toString().c_str());
return check();
}
bool EnvParam::initRdma() {
rdmaPort = autil::EnvUtil::getEnv(RDMA_PORT, "");
if (!tryExtractPosIntValueFromEnv(RDMA_RPC_THREAD_NUM, rdmaRpcThreadNum)) {
return false;
}
if (!tryExtractPosIntValueFromEnv(RDMA_RPC_QUEUE_SIZE, rdmaRpcQueueSize)) {
return false;
}
if (!tryExtractPosIntValueFromEnv(RDMA_WORKER_THREAD_NUM, rdmaWorkerThreadNum)) {
return false;
}
if (!tryExtractPosIntValueFromEnv(RDMA_IO_THREAD_NUM, rdmaIoThreadNum)) {
return false;
}
return true;
}
bool EnvParam::initGrpc() {
gigGrpcPort = EnvUtil::getEnv(GIG_GRPC_PORT);
gigGrpcThreadNum = EnvUtil::getEnv(GIG_GRPC_THREAD_NUM, gigGrpcThreadNum);
grpcCertsDir = EnvUtil::getEnv(GIG_GRPC_CERTS_DIR);
grpcTargetName = EnvUtil::getEnv(GIG_GRPC_TARGET_NAME);
gigEnableAgentStat = EnvUtil::getEnv(GIG_ENABLE_AGENT_STAT, true);
return true;
}
const string EnvParam::toString() const {
stringstream ss;
ss << "httpPort:" << httpPort << "; "
<< "httpThreadNum:" << httpThreadNum << "; "
<< "httpQueueSize:" << httpQueueSize << "; "
<< "dpThreadNum:" << dpThreadNum << "; "
<< "loadThreadNum:" << loadThreadNum << "; "
<< "decisionLoopInterval:" << decisionLoopInterval << "; "
<< "decodeUri:" << decodeUri << "; "
<< "haCompatible:" << haCompatible << "; "
<< "httpIoThreadNum:" << httpIoThreadNum << "; "
<< "enableSapSearch:" << enableSapSearch << "; "
<< "allowPartialTableReady:" << allowPartialTableReady << "; "
<< "needShutdownGracefully:" << needShutdownGracefully << "; "
<< "noDiskQuotaCheck:" << noDiskQuotaCheck << "; "
<< "serviceName:" << serviceName << "; "
<< "amonitorPort:" << amonitorPort << "; "
<< "amonitorPath:" << amonitorPath << "; "
<< "amonitorPathStyle:" << amonitorPathStyle << "; "
<< "roleType:" << roleType << "; "
<< "roleName:" << roleName << "; "
<< "zoneName:" << zoneName << "; "
<< "partId:" << partId << "; "
<< "hippoSlaveIp:" << hippoSlaveIp << "; "
<< "hippoDp2SlavePort:" << hippoDp2SlavePort << "; "
<< "carbonIdentifier:" << carbonIdentifier << "; "
<< "procInstanceId:" << procInstanceId << "; "
<< "packageCheckSum:" << packageCheckSum << "; "
<< "discardMetricSampleInterval:" << discardMetricSampleInterval << "; "
<< "reportIndexStatusInterval:" << reportLoopInterval << "; "
<< "kmonitorPort:" << kmonitorPort << "; "
<< "kmonitorSinkAddress:" << kmonitorSinkAddress << "; "
<< "kmonitorEnableLogFileSink:" << kmonitorEnableLogFileSink << "; "
<< "kmonitorEnablePrometheusSink:" << kmonitorEnablePrometheusSink << "; "
<< "kmonitorManuallyMode:" << kmonitorManuallyMode << "; "
<< "kmonitorServiceName:" << kmonitorServiceName << "; "
<< "kmonitorTenant:" << kmonitorTenant << "; "
<< "kmonitorMetricsPrefix:" << kmonitorMetricsPrefix << "; "
<< "kmonitorGlobalTableMetricsPrefix:" << kmonitorGlobalTableMetricsPrefix << "; "
<< "kmonitorTableMetricsPrefix:" << kmonitorTableMetricsPrefix << "; "
<< "kmonitorMetricsReporterCacheLimit:" << kmonitorMetricsReporterCacheLimit << "; "
<< "kmonitorTags:" << tagsToString(kmonitorTags) << "; "
<< "asyncInterExecutorThreadNum:" << asyncInterExecutorThreadNum << "; "
<< "asyncIntraExecutorThreadNum:" << asyncIntraExecutorThreadNum << "; "
<< "asyncInterExecutorType:" << asyncInterExecutorType << "; "
<< "asyncIntraExecutorType:" << asyncIntraExecutorType << "; "
<< "debugMode:" << debugMode << "; "
<< "localMode:" << localMode << "; "
<< "rdmaPort:" << rdmaPort << "; "
<< "rdmaRpcThreadNum:" << rdmaRpcThreadNum << "; "
<< "rdmaRpcQueueSize:" << rdmaRpcQueueSize << "; "
<< "rdmaWorkerThreadNum:" << rdmaWorkerThreadNum << "; "
<< "rdmaIoThreadNum:" << rdmaIoThreadNum << "; ";
return ss.str();
}
bool EnvParam::check() {
if (localMode) {
AUTIL_LOG(INFO, "deploy in local mode, do not need dp2 resource");
return true;
}
uint32_t port;
if (hippoDp2SlavePort.empty() || !StringUtil::strToUInt32(hippoDp2SlavePort.c_str(), port)) {
AUTIL_LOG(ERROR, "invalid dp2 port [%s]", hippoDp2SlavePort.c_str());
return false;
}
return true;
}
} // namespace suez