agent/flagging/configuration.go (453 lines of code) (raw):
package flagging
import (
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/aliyun/aliyun_assist_client/common/fileutil"
"github.com/aliyun/aliyun_assist_client/common/httpbase"
"github.com/aliyun/aliyun_assist_client/common/metaserver"
"github.com/aliyun/aliyun_assist_client/common/pathutil"
"github.com/aliyun/aliyun_assist_client/thirdparty/sirupsen/logrus"
)
// Config item names
const (
UPDATOR_BOOTSTRAP_UPDATE = "updator.bootstrapUpdate"
UPDATOR_UPDATE = "updator.update"
TASK_CONCURRENCY_HARDLIMIT = "task.concurrencyHardLimit"
TASK_KEEP_SCRIPT_FILE = "task.keepScriptFile"
RESOURCE_CPU_LIMIT = "resource.cpuLimit"
RESOURCE_MEM_LIMIT = "resource.memLimit"
RESOURCE_OVERLOAD_LIMIT = "resource.overloadLimit"
APISERVER_TRY_PRESETS = "apiserver.tryPreset"
ASSIST_DAEMON_ACTIVE = "assistDaemon.active"
)
// Config item default values
const (
DEFAULT_UPDATOR_BOOTSTRAP_UPDATE = true
DEFAULT_UPDATOR_UPDATE = true
DEFAULT_TASK_CONCURRENCY_HARDLIMIT = int64(500)
DEFAULT_TASK_KEEP_SCRIPT_FILE = false
DEFAULT_RESOURCE_CPU_LIMIT = float64(20.0) // %
DEFAULT_RESOURCE_MEM_LIMIT = int64(50 * 1024 * 1024) // Byte
DEFAULT_RESOURCE_OVERLOAD_LIMIT = int64(3)
DEFAULT_APISERVER_TRY_PRESETS = true
DEFAULT_ASSIST_DAEMON_ACTIVE = true
)
const (
configurationFile = "aliyun-assist.conf"
VALUE_ENABLED = "enabled"
VALUE_DISABLED = "disabled"
// The flag files of the old version which are used to control whether
// to prohibit the auto-update
disableUpdateFlagFilename = "disable_update"
disableBootstrapUpdateFlagFilename = "disable_bootstrap_update"
)
type intVar struct{ value int64 }
type floatVar struct{ value float64 }
type boolVar struct{ value bool }
type Value interface {
Get() any
GetString() string
ParseAndSet(s string, set bool) error
Copy() Value
}
func newIntVar(v int64) *intVar { return &intVar{value: v} }
func newFloatVar(v float64) *floatVar { return &floatVar{value: v} }
func newBoolVar(v bool) *boolVar { return &boolVar{value: v} }
func (v *intVar) Get() any { return v.value }
func (v *floatVar) Get() any { return v.value }
func (v *boolVar) Get() any { return v.value }
func (v *intVar) GetString() string { return fmt.Sprint(v.value) }
func (v *floatVar) GetString() string { return fmt.Sprint(v.value) }
func (v *boolVar) GetString() string {
if v.value {
return VALUE_ENABLED
}
return VALUE_DISABLED
}
func (v *intVar) ParseAndSet(s string, set bool) error {
n, err := strconv.ParseInt(s, 10, 0)
if err != nil {
return err
}
if set {
v.value = n
}
return nil
}
func (v *floatVar) ParseAndSet(s string, set bool) error {
n, err := strconv.ParseFloat(s, 64)
if err != nil {
return err
}
if set {
v.value = n
}
return nil
}
func (v *boolVar) ParseAndSet(s string, set bool) error {
var n bool
switch s {
case VALUE_ENABLED:
n = true
case VALUE_DISABLED:
n = false
default:
return errTypeNotMatch
}
if set {
v.value = n
}
return nil
}
func (v *intVar) Copy() Value { return &intVar{value: v.value} }
func (v *floatVar) Copy() Value { return &floatVar{value: v.value} }
func (v *boolVar) Copy() Value { return &boolVar{value: v.value} }
func (v *intVar) GetValue() int64 { return v.value }
func (v *floatVar) GetValue() float64 { return v.value }
func (v *boolVar) GetValue() bool { return v.value }
type agentConfig map[string]Value
type Callback func(logger logrus.FieldLogger, value any)
var (
// DO NOT MODIFY _defaultConfig
// Note: The value of agentConfig is a reference type. Please use Value.Copy()
// to copy it to avoid the reference being modified elsewhere.
_defaultConfig agentConfig
_runtimeConfig agentConfig
_runtimeConfigLock sync.RWMutex
_callback map[string]Callback
_callbackLock sync.Mutex
_configFileLock sync.Mutex
)
var (
errUnknownConfigName = errors.New("unknown config name")
errTypeNotMatch = errors.New("type mismatch")
)
var (
_RUNTIME_UPDATOR_BOOTSTRAP_UPDATE *boolVar
_RUNTIME_UPDATOR_UPDATE *boolVar
_RUNTIME_TASK_CONCURRENCY_HARDLIMIT *intVar
_RUNTIME_TASK_KEEP_SCRIPT_FILE *boolVar
_RUNTIME_RESOURCE_CPU_LIMIT *floatVar
_RUNTIME_RESOURCE_MEM_LIMIT *intVar
_RUNTIME_RESOURCE_OVERLOAD_LIMIT *intVar
_RUNTIME_APISERVER_TRY_PRESETS *boolVar
_RUNTIME_ASSIST_DAEMON_ACTIVE *boolVar
)
func init() {
_defaultConfig = make(agentConfig)
_runtimeConfig = make(agentConfig)
// _defaultConfig defines the data type and default value of each configuration item
_defaultConfig[UPDATOR_BOOTSTRAP_UPDATE] = newBoolVar(DEFAULT_UPDATOR_BOOTSTRAP_UPDATE)
_defaultConfig[UPDATOR_UPDATE] = newBoolVar(DEFAULT_UPDATOR_UPDATE)
_defaultConfig[TASK_CONCURRENCY_HARDLIMIT] = newIntVar(DEFAULT_TASK_CONCURRENCY_HARDLIMIT)
_defaultConfig[TASK_KEEP_SCRIPT_FILE] = newBoolVar(DEFAULT_TASK_KEEP_SCRIPT_FILE)
_defaultConfig[RESOURCE_CPU_LIMIT] = newFloatVar(DEFAULT_RESOURCE_CPU_LIMIT)
_defaultConfig[RESOURCE_MEM_LIMIT] = newIntVar(DEFAULT_RESOURCE_MEM_LIMIT)
_defaultConfig[RESOURCE_OVERLOAD_LIMIT] = newIntVar(DEFAULT_RESOURCE_OVERLOAD_LIMIT)
_defaultConfig[APISERVER_TRY_PRESETS] = newBoolVar(DEFAULT_APISERVER_TRY_PRESETS)
_defaultConfig[ASSIST_DAEMON_ACTIVE] = newBoolVar(DEFAULT_ASSIST_DAEMON_ACTIVE)
}
func InitConfig(logger logrus.FieldLogger) {
_runtimeConfigLock.Lock()
defer _runtimeConfigLock.Unlock()
defer updateRuntime()
_callbackLock.Lock()
defer _callbackLock.Unlock()
_runtimeConfig = loadAllConfig(logger)
_callback = make(map[string]Callback)
}
func updateRuntime() {
_RUNTIME_UPDATOR_BOOTSTRAP_UPDATE = _runtimeConfig[UPDATOR_BOOTSTRAP_UPDATE].(*boolVar)
_RUNTIME_UPDATOR_UPDATE = _runtimeConfig[UPDATOR_UPDATE].(*boolVar)
_RUNTIME_TASK_CONCURRENCY_HARDLIMIT = _runtimeConfig[TASK_CONCURRENCY_HARDLIMIT].(*intVar)
_RUNTIME_TASK_KEEP_SCRIPT_FILE = _runtimeConfig[TASK_KEEP_SCRIPT_FILE].(*boolVar)
_RUNTIME_RESOURCE_CPU_LIMIT = _runtimeConfig[RESOURCE_CPU_LIMIT].(*floatVar)
_RUNTIME_RESOURCE_MEM_LIMIT = _runtimeConfig[RESOURCE_MEM_LIMIT].(*intVar)
_RUNTIME_RESOURCE_OVERLOAD_LIMIT = _runtimeConfig[RESOURCE_OVERLOAD_LIMIT].(*intVar)
_RUNTIME_APISERVER_TRY_PRESETS = _runtimeConfig[APISERVER_TRY_PRESETS].(*boolVar)
_RUNTIME_ASSIST_DAEMON_ACTIVE = _runtimeConfig[ASSIST_DAEMON_ACTIVE].(*boolVar)
}
func GetAllConf(logger logrus.FieldLogger, runtime bool) map[string]string {
res := make(map[string]string)
if runtime {
_runtimeConfigLock.RLock()
defer _runtimeConfigLock.RUnlock()
for k, v := range _runtimeConfig {
res[k] = v.GetString()
}
} else {
conf := loadAllConfig(logger)
for k, v := range conf {
res[k] = v.GetString()
}
}
return res
}
// Register the callback function for each configuration item, and then call
// the callback function to make the configuration take effect.
func RegisterCallbackAndApply(logger logrus.FieldLogger, callbacks map[string]Callback) error {
_runtimeConfigLock.RLock()
defer _runtimeConfigLock.RUnlock()
_callbackLock.Lock()
defer _callbackLock.Unlock()
for n, f := range callbacks {
if _, ok := _runtimeConfig[n]; !ok {
logger.Warning("Unknown configuration item: ", n)
} else {
_callback[n] = f
f(logger, _runtimeConfig[n].Get())
}
}
return nil
}
// Update runtime config and call callback function to make configuration task effect.
func UpdateConfigRuntime(logger logrus.FieldLogger, kvList []string) error {
_runtimeConfigLock.Lock()
defer _runtimeConfigLock.Unlock()
_callbackLock.Lock()
defer _callbackLock.Unlock()
for i := 0; i < len(kvList); i += 2 {
property := kvList[i]
value := kvList[i+1]
if v, ok := _runtimeConfig[property]; !ok {
return errUnknownConfigName
} else if err := v.ParseAndSet(value, false); err != nil {
return err
}
}
callbackKeys := []string{}
for i := 0; i < len(kvList); i += 2 {
property := kvList[i]
value := kvList[i+1]
_runtimeConfig[property].ParseAndSet(value, true)
if _, ok := _callback[property]; ok {
callbackKeys = append(callbackKeys, property)
}
}
for _, property := range callbackKeys {
if f, ok := _callback[property]; ok {
f(logger, _runtimeConfig[property].Get())
}
}
return nil
}
// Update configuration file.
func UpdateConfigFile(logger logrus.FieldLogger, kvList []string, crossVersion bool) error {
tempConf := make(agentConfig)
for i := 0; i < len(kvList); i += 2 {
property := kvList[i]
value := kvList[i+1]
if v, ok := _defaultConfig[property]; !ok {
return errUnknownConfigName
} else if err := v.ParseAndSet(value, false); err != nil {
return err
}
tempConf[property] = _defaultConfig[property].Copy()
tempConf[property].ParseAndSet(value, true)
}
conf := loadConfigFile(logger, !crossVersion, crossVersion)
for k, v := range tempConf {
conf[k] = v
}
return dumpConfigFile(logger, conf, crossVersion)
}
// Reload all config
func ReloadConfig(logger logrus.FieldLogger) {
conf := loadAllConfig(logger)
if len(conf) == 0 {
return
}
_runtimeConfigLock.Lock()
defer _runtimeConfigLock.Unlock()
defer updateRuntime()
_callbackLock.Lock()
defer _callbackLock.Unlock()
for k, v := range conf {
_runtimeConfig[k] = v
// TODO: call callback function only if _runtimeConfig[k] changed
if f, ok := _callback[k]; ok {
f(logger, v)
}
}
}
func loadAllConfig(logger logrus.FieldLogger) agentConfig {
conf := make(agentConfig)
for k, v := range _defaultConfig {
conf[k] = v.Copy()
}
// Load from metaserver
confFromMetaserver := loadConfigFromMetaserver(logger)
for k, v := range confFromMetaserver {
conf[k] = v
}
// Load from config file to _runtimeConfig
confFromFile := loadConfigFile(logger, true, true)
for k, v := range confFromFile {
conf[k] = v
}
return conf
}
// Read content from confPath and parse it to map[string]any.
func parseConfigFile(confPath string) (conf agentConfig, err error) {
var content []byte
content, err = os.ReadFile(confPath)
if err != nil {
return
} else {
conf = parseConfigContent(string(content))
}
return
}
func parseConfigContent(content string) agentConfig {
conf := make(agentConfig)
for _, line := range strings.Split(string(content), "\n") {
line := strings.TrimSpace(line)
if len(line) == 0 || strings.HasPrefix(line, "#") {
continue
}
idx := strings.Index(line, "=")
if idx == -1 || idx >= len(line)-1 {
continue
}
key := line[:idx]
value := line[idx+1:]
if v, ok := _defaultConfig[key]; !ok {
continue
} else if err := v.ParseAndSet(value, false); err != nil {
continue
}
conf[key] = _defaultConfig[key].Copy()
conf[key].ParseAndSet(value, true)
}
return conf
}
// Load configuration from configuration file.
func loadConfigFile(logger logrus.FieldLogger, currentVer, crossVer bool) agentConfig {
_configFileLock.Lock()
defer _configFileLock.Unlock()
var confCrossVer, confVer agentConfig
if crossVer {
confDir, err := pathutil.GetCrossVersionConfigPath()
if err != nil {
logger.WithError(err).Error("Get cross version config path failed.")
} else {
confCrossVer, err = parseConfigFile(filepath.Join(confDir, configurationFile))
if err != nil {
logger.WithError(err).Error("Parse cross version config file failed.")
}
}
}
if currentVer {
confDir, err := pathutil.GetConfigPath()
if err != nil {
logger.WithError(err).Error("Get config path failed.")
} else {
confVer, err = parseConfigFile(filepath.Join(confDir, configurationFile))
if err != nil {
logger.WithError(err).Error("Parse config file failed.")
}
}
}
if confCrossVer == nil {
confCrossVer = make(agentConfig)
}
// confVer has higher priority than confCrossVer
for k, v := range confVer {
confCrossVer[k] = v
}
// Item in legacy configuration will be ignored if there is an explicit configuration
confLegacy := loadConfLegacy(currentVer, crossVer)
for k, v := range confLegacy {
if _, ok := confCrossVer[k]; !ok {
confCrossVer[k] = v
}
}
return confCrossVer
}
func dumpConfigFile(logger logrus.FieldLogger, conf agentConfig, crossVer bool) error {
_configFileLock.Lock()
defer _configFileLock.Unlock()
var confDir string
var err error
if crossVer {
confDir, err = pathutil.GetCrossVersionConfigPath()
} else {
confDir, err = pathutil.GetConfigPath()
}
if err != nil {
logger.WithError(err).WithField("CrossVersion", crossVer).Error("Get config path failed.")
return err
}
var res []string
for k, v := range conf {
res = append(res, fmt.Sprintf("%s=%s", k, v.GetString()))
}
content := strings.Join(res, "\n")
confPath := filepath.Join(confDir, configurationFile)
return os.WriteFile(confPath, []byte(content), 0644)
}
func loadConfLegacy(currentVer, crossVer bool) agentConfig {
conf := make(agentConfig)
if crossVer {
if confDir, err := pathutil.GetCrossVersionConfigPath(); err == nil {
if fileutil.CheckFileIsExist(filepath.Join(confDir, disableBootstrapUpdateFlagFilename)) {
conf[UPDATOR_BOOTSTRAP_UPDATE] = newBoolVar(false)
}
if fileutil.CheckFileIsExist(filepath.Join(confDir, disableUpdateFlagFilename)) {
conf[UPDATOR_UPDATE] = newBoolVar(false)
}
}
}
if currentVer {
if confDir, err := pathutil.GetConfigPath(); err == nil {
if fileutil.CheckFileIsExist(filepath.Join(confDir, disableBootstrapUpdateFlagFilename)) {
conf[UPDATOR_BOOTSTRAP_UPDATE] = newBoolVar(false)
}
if fileutil.CheckFileIsExist(filepath.Join(confDir, disableUpdateFlagFilename)) {
conf[UPDATOR_UPDATE] = newBoolVar(false)
}
}
}
return conf
}
// load configuration from metaserver api
func loadConfigFromMetaserver(logger logrus.FieldLogger) (conf agentConfig) {
conf = make(agentConfig)
content, err := metaserver.GetAgentProvision(logger, httpbase.WithTimeoutInSeconds(2))
if err != nil {
logger.WithError(err).Error("Get config from metaserver failed")
return
}
logger.Info("Get config content from metaserver: ", content)
conf = parseConfigContent(content)
return
}
func GetUpdatorBootstrapUpdate() bool {
_runtimeConfigLock.RLock()
defer _runtimeConfigLock.RUnlock()
return _RUNTIME_UPDATOR_BOOTSTRAP_UPDATE.GetValue()
}
func GetUpdatorUpdate() bool {
_runtimeConfigLock.RLock()
defer _runtimeConfigLock.RUnlock()
return _RUNTIME_UPDATOR_UPDATE.GetValue()
}
func GetTaskConcurrencyHardlimit() int64 {
_runtimeConfigLock.RLock()
defer _runtimeConfigLock.RUnlock()
return _RUNTIME_TASK_CONCURRENCY_HARDLIMIT.GetValue()
}
func GetTaskKeepScriptFile() bool {
_runtimeConfigLock.RLock()
defer _runtimeConfigLock.RUnlock()
return _RUNTIME_TASK_KEEP_SCRIPT_FILE.GetValue()
}
func GetResourceCpuLimit() float64 {
_runtimeConfigLock.RLock()
defer _runtimeConfigLock.RUnlock()
return _RUNTIME_RESOURCE_CPU_LIMIT.GetValue()
}
func GetResourceMemLimit() int64 {
_runtimeConfigLock.RLock()
defer _runtimeConfigLock.RUnlock()
return _RUNTIME_RESOURCE_MEM_LIMIT.GetValue()
}
func GetResourceOverloadLimit() int64 {
_runtimeConfigLock.RLock()
defer _runtimeConfigLock.RUnlock()
return _RUNTIME_RESOURCE_OVERLOAD_LIMIT.GetValue()
}
func GetApiserverTryPreset() bool {
_runtimeConfigLock.RLock()
defer _runtimeConfigLock.RUnlock()
return _RUNTIME_APISERVER_TRY_PRESETS.GetValue()
}
func GetAssistDaemonActive() bool {
_runtimeConfigLock.RLock()
defer _runtimeConfigLock.RUnlock()
return _RUNTIME_ASSIST_DAEMON_ACTIVE.GetValue()
}