in inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java [380:587]
private void preReadFields() {
String tmpValue;
// read cluster tag
tmpValue = this.props.get(KEY_PROXY_CLUSTER_TAG);
if (StringUtils.isNotBlank(tmpValue)) {
this.clusterTag = tmpValue.trim();
}
// read cluster name
tmpValue = this.props.get(KEY_PROXY_CLUSTER_NAME);
if (StringUtils.isNotBlank(tmpValue)) {
this.clusterName = tmpValue.trim();
}
// read cluster incharges
tmpValue = this.props.get(KEY_PROXY_CLUSTER_INCHARGES);
if (StringUtils.isNotEmpty(tmpValue)) {
this.clusterIncharges = tmpValue.trim();
}
tmpValue = this.props.get(KEY_PROXY_CLUSTER_EXT_TAG);
if (StringUtils.isNotEmpty(tmpValue)) {
this.clusterExtTag = tmpValue.trim();
}
// read configure sync interval
tmpValue = this.props.get(KEY_META_CONFIG_SYNC_INTERVAL_MS);
if (StringUtils.isBlank(tmpValue)) {
tmpValue = this.props.get(KEY_CONFIG_CHECK_INTERVAL_MS);
}
if (StringUtils.isNotEmpty(tmpValue)) {
long tmpSyncInvMs = NumberUtils.toLong(tmpValue.trim(), VAL_DEF_CONFIG_SYNC_INTERVAL_MS);
if (tmpSyncInvMs >= VAL_MIN_CONFIG_SYNC_INTERVAL_MS) {
this.metaConfigSyncInvlMs = tmpSyncInvMs;
}
}
// read enable startup using local meta file
tmpValue = this.props.get(KEY_ENABLE_STARTUP_USING_LOCAL_META_FILE);
if (StringUtils.isNotEmpty(tmpValue)) {
this.enableStartupUsingLocalMetaFile = "TRUE".equalsIgnoreCase(tmpValue.trim());
}
// read whether accept msg without id2topic configure
tmpValue = this.props.get(KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT);
if (StringUtils.isNotEmpty(tmpValue)) {
this.enableUnConfigTopicAccept = "TRUE".equalsIgnoreCase(tmpValue.trim());
}
// read default topics
tmpValue = this.props.get(KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS);
if (StringUtils.isNotBlank(tmpValue)) {
List<String> tmpList = new ArrayList<>();
String[] topicItems = tmpValue.split("\\s+");
for (String item : topicItems) {
if (StringUtils.isBlank(item)) {
continue;
}
tmpList.add(item.trim());
}
if (tmpList.size() > 0) {
defaultTopics = tmpList;
}
}
// read enable whitelist
tmpValue = this.props.get(KEY_ENABLE_WHITELIST);
if (StringUtils.isNotEmpty(tmpValue)) {
this.enableWhiteList = "TRUE".equalsIgnoreCase(tmpValue.trim());
}
// read manager type
tmpValue = this.props.get(KEY_MANAGER_TYPE);
if (StringUtils.isNotBlank(tmpValue)) {
this.managerType = tmpValue.trim();
}
// read manager auth secret id
tmpValue = this.props.get(KEY_MANAGER_AUTH_SECRET_ID);
if (StringUtils.isNotBlank(tmpValue)) {
this.managerAuthSecretId = tmpValue.trim();
}
// read manager auth secret key
tmpValue = this.props.get(KEY_MANAGER_AUTH_SECRET_KEY);
if (StringUtils.isNotBlank(tmpValue)) {
this.managerAuthSecretKey = tmpValue.trim();
}
// read whether enable file metric
tmpValue = this.props.get(KEY_ENABLE_FILE_METRIC);
if (StringUtils.isNotEmpty(tmpValue)) {
this.enableFileMetric = "TRUE".equalsIgnoreCase(tmpValue.trim());
}
// read file metric statistic interval
tmpValue = this.props.get(KEY_FILE_METRIC_STAT_INTERVAL_SEC);
if (StringUtils.isNotEmpty(tmpValue)) {
int statInvl = NumberUtils.toInt(tmpValue.trim(), VAL_DEF_FILE_METRIC_STAT_INVL_SEC);
if (statInvl >= VAL_MIN_FILE_METRIC_MAX_CACHE_CNT) {
this.fileMetricStatInvlSec = statInvl;
}
}
// read file metric statistic max cache count
tmpValue = this.props.get(KEY_FILE_METRIC_MAX_CACHE_CNT);
if (StringUtils.isNotEmpty(tmpValue)) {
int maxCacheCnt = NumberUtils.toInt(tmpValue.trim(), VAL_DEF_FILE_METRIC_MAX_CACHE_CNT);
if (maxCacheCnt >= VAL_MIN_FILE_METRIC_STAT_INVL_SEC) {
this.fileMetricStatCacheCnt = maxCacheCnt;
}
}
// read source file statistic output name
tmpValue = this.props.get(KEY_FILE_METRIC_SOURCE_OUTPUT_NAME);
if (StringUtils.isNotBlank(tmpValue)) {
this.fileMetricSourceOutName = tmpValue.trim();
}
// read sink file statistic output name
tmpValue = this.props.get(KEY_FILE_METRIC_SINK_OUTPUT_NAME);
if (StringUtils.isNotBlank(tmpValue)) {
this.fileMetricSinkOutName = tmpValue.trim();
}
// read event file statistic output name
tmpValue = this.props.get(KEY_FILE_METRIC_EVENT_OUTPUT_NAME);
if (StringUtils.isNotBlank(tmpValue)) {
this.fileMetricEventOutName = tmpValue.trim();
}
// read whether enable audit
tmpValue = this.props.get(KEY_ENABLE_AUDIT);
if (StringUtils.isNotEmpty(tmpValue)) {
this.enableAudit = "TRUE".equalsIgnoreCase(tmpValue.trim());
}
// read audit proxys
tmpValue = this.props.get(KEY_AUDIT_PROXYS);
if (StringUtils.isNotBlank(tmpValue)) {
String[] ipPorts = tmpValue.split("\\s+");
for (String tmpIPPort : ipPorts) {
if (StringUtils.isBlank(tmpIPPort)) {
continue;
}
this.auditProxys.add(tmpIPPort.trim());
}
}
// read audit file path
tmpValue = this.props.get(KEY_AUDIT_FILE_PATH);
if (StringUtils.isNotBlank(tmpValue)) {
this.auditFilePath = tmpValue.trim();
}
// read audit max cache rows
tmpValue = this.props.get(KEY_AUDIT_MAX_CACHE_ROWS);
if (StringUtils.isNotEmpty(tmpValue)) {
this.auditMaxCacheRows = NumberUtils.toInt(tmpValue.trim(), VAL_DEF_AUDIT_MAX_CACHE_ROWS);
}
// read audit format interval
tmpValue = this.props.get(KEY_AUDIT_FORMAT_INTERVAL_MS);
if (StringUtils.isNotEmpty(tmpValue)) {
this.auditFormatInvlMs = NumberUtils.toLong(tmpValue.trim(), VAL_DEF_AUDIT_FORMAT_INTERVAL_MS);
}
// read whether response after save
tmpValue = this.props.get(KEY_RESPONSE_AFTER_SAVE);
if (StringUtils.isNotEmpty(tmpValue)) {
this.responseAfterSave = "TRUE".equalsIgnoreCase(tmpValue.trim());
}
// read max response after save timeout
tmpValue = this.props.get(KEY_MAX_RAS_TIMEOUT_MS);
if (StringUtils.isNotEmpty(tmpValue)) {
this.maxResAfterSaveTimeout = NumberUtils.toLong(tmpValue.trim(), VAL_DEF_MAX_RAS_TIMEOUT_MS);
}
// read max bufferqueue size
tmpValue = this.props.get(KEY_MAX_BUFFERQUEUE_SIZE_KB);
if (StringUtils.isNotEmpty(tmpValue)) {
this.maxBufferQueueSizeKb = NumberUtils.toInt(tmpValue.trim(), VAL_DEF_MAX_BUFFERQUEUE_SIZE_KB);
}
// read event handler
tmpValue = this.props.get(KEY_EVENT_HANDLER);
if (StringUtils.isNotBlank(tmpValue)) {
this.eventHandler = tmpValue.trim();
}
// read cache cluster selector
tmpValue = this.props.get(KEY_CACHE_CLUSTER_SELECTOR);
if (StringUtils.isNotBlank(tmpValue)) {
this.cacheClusterSelector = tmpValue.trim();
}
// read proxy node id
tmpValue = this.props.get(KEY_PROXY_NODE_ID);
if (StringUtils.isNotBlank(tmpValue)) {
this.proxyNodeId = tmpValue.trim();
}
// read msg compress type
tmpValue = this.props.get(KEY_MSG_SENT_COMPRESS_TYPE);
if (StringUtils.isNotBlank(tmpValue)) {
this.msgCompressType = tmpValue.trim();
}
// read prometheus Http Port
tmpValue = this.props.get(KEY_PROMETHEUS_HTTP_PORT);
if (StringUtils.isNotEmpty(tmpValue)) {
this.prometheusHttpPort = NumberUtils.toInt(tmpValue.trim(), VAL_DEF_PROMETHEUS_HTTP_PORT);
}
// read whether retry send message after sent failure
tmpValue = this.props.get(KEY_ENABLE_SEND_RETRY_AFTER_FAILURE);
if (StringUtils.isNotEmpty(tmpValue)) {
this.enableSendRetryAfterFailure = "TRUE".equalsIgnoreCase(tmpValue.trim());
}
// read max retry count
tmpValue = this.props.get(KEY_MAX_RETRIES_AFTER_FAILURE);
if (StringUtils.isNotBlank(tmpValue)) {
int retries = NumberUtils.toInt(tmpValue.trim(), VAL_DEF_MAX_RETRIES_AFTER_FAILURE);
if (retries >= 0) {
this.maxRetriesAfterFailure = retries;
}
}
// initial ip parser
try {
Class<? extends IManagerIpListParser> ipListParserClass =
(Class<? extends IManagerIpListParser>) Class.forName(this.managerType);
this.ipListParser = ipListParserClass.getDeclaredConstructor().newInstance();
this.ipListParser.setCommonProperties(this.props);
} catch (Throwable t) {
LOG.error("Initial ipListParser Class {} failure, exit!", this.managerType, t);
System.exit(6);
}
}