aios/apps/facility/swift/config/PartitionConfig.h (229 lines of code) (raw):
/*
* Copyright 2014-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <algorithm>
#include <stdint.h>
#include <string>
#include <vector>
#include "autil/Log.h"
#include "swift/common/Common.h"
#include "swift/protocol/Common.pb.h"
namespace swift {
namespace config {
static const int64_t MIN_PARTITION_BUFFER_SIZE = 1 << 20; // 1M
static const int64_t MIN_BLOCK_SIZE = 1 << 12; // 4k
static const int64_t MIN_MAX_COMMIT_SIZE = 1 << 20; // 1M
static const int64_t MIN_MAX_COMMIT_INTERVAL = 1000; // 1ms
static const int64_t MIN_CLOSE_NO_WRITE_FILE_INTERVAL = 60 * 1000 * 1000; // 1MIN
static const int64_t MIN_MAX_FILE_SIZE = 1 << 26; // 64M
static const int64_t MIN_MAX_FILE_SPLIT_INTERVAL = 60 * 1000 * 1000; // 1min
static const int64_t MIN_MIN_CHANGE_FILE_FOR_DFS_ERROR_TIME = 60 * 1000 * 1000; // 1min
static const int64_t MIN_MAX_COMMIT_TIME_AS_ERROR = 1000 * 1000; // 1s
static const int64_t MIN_OBSOLETE_FILE_TIME_INTERVAL = 600 * 1000 * 1000; // 10min
static const int32_t MIN_RESERVED_FILE_COUNT = 5;
static const int32_t MAX_WAIT_TIME_FOR_SECURITY_COMMIT = 50 * 1000; // 50ms
static const int32_t MAX_DATA_SIZE_FOR_SECURITY_COMMIT = 1 << 20; // 1M
class PartitionConfig {
public:
PartitionConfig();
~PartitionConfig();
public:
void setDataRoot(const std::string &dataRoot) { _dataRoot = dataRoot; }
const std::string &getDataRoot() const { return _dataRoot; }
void setPartitionMinBufferSize(int64_t value) {
if (!_configUnlimited) {
_partitionMinBufferSize = std::max(value, MIN_PARTITION_BUFFER_SIZE);
} else {
_partitionMinBufferSize = value;
}
}
void setPartitionMaxBufferSize(int64_t value) {
if (!_configUnlimited) {
_partitionMaxBufferSize = std::max(value, MIN_PARTITION_BUFFER_SIZE);
} else {
_partitionMaxBufferSize = value;
}
}
void setBlockSize(int64_t value) {
if (!_configUnlimited) {
_blockSize = std::max(value, MIN_BLOCK_SIZE);
} else {
_blockSize = value;
}
}
void setMaxCommitSize(int64_t value) {
if (!_configUnlimited) {
_maxCommitSize = std::max(value, MIN_MAX_COMMIT_SIZE);
} else {
_maxCommitSize = value;
}
}
void setMaxCommitInterval(int64_t value) {
if (!_configUnlimited) {
_maxCommitInterval = std::max(value, MIN_MAX_COMMIT_INTERVAL);
} else {
_maxCommitInterval = value;
}
}
void setMaxCommitIntervalForMemoryPreferTopic(int64_t value) {
if (!_configUnlimited) {
_maxCommitIntervalForMemoryPreferTopic = std::max(value, MIN_MAX_COMMIT_INTERVAL);
} else {
_maxCommitIntervalForMemoryPreferTopic = value;
}
}
void setMaxCommitIntervalWhenDelay(int64_t value) {
if (!_configUnlimited) {
_maxCommitIntervalWhenDelay = std::max(value, MIN_MAX_COMMIT_INTERVAL);
} else {
_maxCommitIntervalWhenDelay = value;
}
}
void setMaxFileSize(int64_t value) {
if (!_configUnlimited) {
_maxFileSize = std::max(value, MIN_MAX_FILE_SIZE);
} else {
_maxFileSize = value;
}
}
void setMinFileSize(int64_t value) { _minFileSize = value; }
void setMaxFileSplitInterval(int64_t value) {
if (!_configUnlimited) {
_maxFileSplitTime = std::max(value, MIN_MAX_FILE_SPLIT_INTERVAL);
} else {
_maxFileSplitTime = value;
}
}
void setCloseNoWriteFileInterval(int64_t value) {
if (!_configUnlimited) {
_closeNoWriteFileInterval = std::max(value, MIN_CLOSE_NO_WRITE_FILE_INTERVAL);
} else {
_closeNoWriteFileInterval = value;
}
}
void setMinChangeFileForDfsErrorTime(int64_t value) {
if (!_configUnlimited) {
_minChangeFileForDfsErrorTime = std::max(value, MIN_MIN_CHANGE_FILE_FOR_DFS_ERROR_TIME);
} else {
_minChangeFileForDfsErrorTime = value;
}
}
void setMaxCommitTimeAsError(int64_t value) {
if (!_configUnlimited) {
_maxCommitTimeAsError = std::max(value, MIN_MAX_COMMIT_TIME_AS_ERROR);
} else {
_maxCommitTimeAsError = value;
}
}
void setConfigUnlimited(bool value) { _configUnlimited = value; }
void setTopicMode(protocol::TopicMode topicMode) { _topicMode = topicMode; }
void setNeedFieldFilter(bool needFieldFilter) { _needFieldFilter = needFieldFilter; }
void setObsoleteFileTimeInterval(int64_t timeIntervalHour) {
if (!_configUnlimited) {
_obsoleteFileTimeInterval = std::max(timeIntervalHour, MIN_OBSOLETE_FILE_TIME_INTERVAL);
} else {
_obsoleteFileTimeInterval = timeIntervalHour;
}
}
void setReservedFileCount(int32_t reservedFileCount) {
if (!_configUnlimited) {
_reservedFileCount = std::max(reservedFileCount, MIN_RESERVED_FILE_COUNT);
} else {
_reservedFileCount = reservedFileCount;
}
}
void setDelObsoleteFileInterval(int64_t timeIntervalSec) { _delObsoleteFileInterval = timeIntervalSec; }
void setCandidateObsoleteFileInterval(int64_t timeIntervalSec) { _candidateObsoleteFileInterval = timeIntervalSec; }
int64_t getCandidateObsoleteFileInterval() const { return _candidateObsoleteFileInterval; }
int64_t getDelObsoleteFileInterval() const { return _delObsoleteFileInterval; }
int32_t getReservedFileCount() const { return _reservedFileCount; }
int64_t getObsoleteFileTimeInterval() const { return _obsoleteFileTimeInterval; }
protocol::TopicMode getTopicMode() const { return _topicMode; }
bool needFieldFilter() const { return _needFieldFilter; }
bool getConfigUnlimited() const { return _configUnlimited; }
int64_t getPartitionMinBufferSize() const { return _partitionMinBufferSize; }
int64_t getPartitionMaxBufferSize() const { return _partitionMaxBufferSize; }
int64_t getBlockSize() const { return _blockSize; }
int64_t getMaxCommitSize() const { return _maxCommitSize; }
int64_t getMaxCommitInterval() const { return _maxCommitInterval; }
int64_t getMaxCommitIntervalForMemoryPreferTopic() const { return _maxCommitIntervalForMemoryPreferTopic; }
int64_t getMaxCommitIntervalWhenDelay() const { return _maxCommitIntervalWhenDelay; }
int64_t getMaxFileSize() const { return _maxFileSize; }
int64_t getMinFileSize() const { return _minFileSize; }
int64_t getMaxFileSplitInterval() const { return _maxFileSplitTime; }
int64_t getCloseNoWriteFileInterval() const { return _closeNoWriteFileInterval; }
int64_t getMinChangeFileForDfsErrorTime() const { return _minChangeFileForDfsErrorTime; }
int64_t getMaxCommitTimeAsError() const { return _maxCommitTimeAsError; }
void setMaxMessageCountInOneFile(int64_t maxMessageCountInOneFile) {
_maxMessageCountInOneFile = maxMessageCountInOneFile;
}
int64_t getMaxMessageCountInOneFile() const { return _maxMessageCountInOneFile; }
void setCacheMetaCount(uint32_t cacheMetaCount) { _cacheMetaCount = cacheMetaCount; }
uint32_t getCacheMetaCount() const { return _cacheMetaCount; }
void setMaxWaitTimeForSecurityCommit(int64_t waitTime) { _maxWaitTimeForSecurityCommit = waitTime; }
int64_t getMaxWaitTimeForSecurityCommit() const { return _maxWaitTimeForSecurityCommit; }
void setMaxDataSizeForSecurityCommit(int64_t dataSize) { _maxDataSizeForSecurityCommit = dataSize; }
int64_t getMaxDataSizeForSecurityCommit() const { return _maxDataSizeForSecurityCommit; }
void setCompressMsg(bool compressMsg) { _compressMsg = compressMsg; }
bool compressMsg() const { return _compressMsg; }
void setCompressThres(int thres) { _compressThres = thres; }
int getCompressThres() const { return _compressThres; }
void setExtendDataRoots(const std::vector<std::string> &dataRoot) { _extendDataRoots = dataRoot; }
const std::vector<std::string> &getExtendDataRoots() const { return _extendDataRoots; }
void setRecyclePercent(double percent) { _recyclePercent = percent; }
double getRecyclePercent() const { return _recyclePercent; }
void setCheckFieldFilterMsg(bool flag) { _checkFieldFilterMsg = flag; }
bool checkFieldFilterMsg() const { return _checkFieldFilterMsg; }
void setObsoleteReaderInterval(int64_t timeInterval) { _obsoleteReaderInterval = timeInterval; }
int64_t getObsoleteReaderInterval() const { return _obsoleteReaderInterval; }
void setObsoleteReaderMetricInterval(int64_t timeInterval) { _obsoleteReaderMetricInterval = timeInterval; }
int64_t getObsoleteReaderMetricInterval() const { return _obsoleteReaderMetricInterval; }
void setMaxReadSizeSec(int64_t sizeMB) { _maxReadSizeSec = sizeMB << 20; }
int64_t getMaxReadSizeSec() const { return _maxReadSizeSec; }
void setEnableLongPolling(bool isEnable) { _enableLongPolling = isEnable; }
bool enableLongPolling() const { return _enableLongPolling; }
void setReadNotCommittedMsg(bool isRead) { _readNotCommittedMsg = isRead; }
bool readNotCommittedMsg() const { return _readNotCommittedMsg; }
void setTimestampOffsetInUs(int64_t offset) { _timestampOffsetInUs = offset; }
int64_t getTimestampOffsetInUs() const { return _timestampOffsetInUs; }
private:
std::string _dataRoot;
int64_t _partitionMinBufferSize;
int64_t _partitionMaxBufferSize;
int64_t _blockSize;
int64_t _maxCommitSize; // byte
int64_t _maxCommitInterval; // us
int64_t _maxCommitIntervalForMemoryPreferTopic; // us
int64_t _maxCommitIntervalWhenDelay; // us
int64_t _maxFileSize; // byte
int64_t _minFileSize; // byte
int64_t _maxFileSplitTime; // us
int64_t _closeNoWriteFileInterval; // us
int64_t _minChangeFileForDfsErrorTime; // us
int64_t _maxCommitTimeAsError; // us
bool _configUnlimited;
protocol::TopicMode _topicMode;
bool _needFieldFilter;
int64_t _obsoleteFileTimeInterval;
int32_t _reservedFileCount;
int64_t _delObsoleteFileInterval;
int64_t _candidateObsoleteFileInterval;
int64_t _maxMessageCountInOneFile;
uint32_t _cacheMetaCount;
int64_t _maxWaitTimeForSecurityCommit;
int64_t _maxDataSizeForSecurityCommit;
bool _compressMsg;
int _compressThres;
std::vector<std::string> _extendDataRoots;
double _recyclePercent;
bool _checkFieldFilterMsg;
int64_t _obsoleteReaderInterval;
int64_t _obsoleteReaderMetricInterval;
int64_t _maxReadSizeSec;
bool _enableLongPolling;
bool _readNotCommittedMsg;
int64_t _timestampOffsetInUs;
private:
AUTIL_LOG_DECLARE();
};
SWIFT_TYPEDEF_PTR(PartitionConfig);
} // namespace config
} // namespace swift