agent/statemanager/statemanager.go (315 lines of code) (raw):
package statemanager
import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"runtime"
"runtime/debug"
"sync"
"time"
"github.com/aliyun/aliyun_assist_client/agent/clientreport"
"github.com/aliyun/aliyun_assist_client/thirdparty/sirupsen/logrus"
"github.com/aliyun/aliyun_assist_client/agent/inventory/gatherers/instance"
"github.com/aliyun/aliyun_assist_client/agent/log"
"github.com/aliyun/aliyun_assist_client/agent/statemanager/resources"
"github.com/aliyun/aliyun_assist_client/agent/taskengine/timermanager"
"github.com/aliyun/aliyun_assist_client/agent/util/timetool"
)
const (
// DefaultRefreshIntervalSeconds is the default interval of refresh state configurations in seconds
DefaultRefreshIntervalSeconds = 1800
// MaxInitTimerDriftSeconds is the default max seconds to delay when initializing state manager timer
MaxInitTimerDriftSeconds = 3 * 60
)
const (
// Agent applies the configuration and does nothing further
// unless the configuration (template and parameters) is updated.
// After initial application of a new configuration,
// agent does not check for drift from a previously configured state.
// Agent will attempt to apply the configuration until it is successful before ApplyOnly takes effect.
ApplyOnly = "ApplyOnly"
// Agent applies any new configurations.
// After initial application of a new configuration, if the instance drifts from the desired state,
// reports the discrepancy to server.
// Agent will attempt to apply the configuration until it is successful before ApplyAndMonitor takes effect.
ApplyAndMonitor = "ApplyAndMonitor"
// Agent applies any new configurations. After initial application of a new configuration,
// if the instance drifts from the desired state, reports the discrepancy to server,
// and then re-applies the current configuration.
ApplyAndAutoCorrect = "ApplyAndAutoCorrect"
)
const (
Apply = "Apply"
Monitor = "Monitor"
Skip = "Skip"
)
const (
Compliant = "Compliant"
NotCompliant = "NotCompliant"
Failed = "Failed"
)
var (
_stateManageTimer *timermanager.Timer
_statemanageTimerInitLock sync.Mutex
refreshIntervalSeconds = DefaultRefreshIntervalSeconds
_stateConfigsLock sync.RWMutex
stateConfigs map[string]StateConfiguration
)
func updateStateConfigs(configs []StateConfiguration) {
_stateConfigsLock.Lock()
defer _stateConfigsLock.Unlock()
stateConfigs = map[string]StateConfiguration{}
for _, config := range configs {
stateConfigs[config.StateConfigurationId] = config
}
}
func getStateConfig(stateConfigId string) (config StateConfiguration, ok bool) {
_stateConfigsLock.RLock()
defer _stateConfigsLock.RUnlock()
config, ok = stateConfigs[stateConfigId]
return
}
// refreshStateConfigs pulls state configurations from server and refresh state configuration timers
func refreshStateConfigs() {
defer func() {
if panicPayload := recover(); panicPayload != nil {
stacktrace := debug.Stack()
fmt.Println(string(stacktrace))
clientreport.ReportPanic(panicPayload, stacktrace, false)
}
}()
log.GetLogger().Info("refresh state configurations")
cachedResult, err := LoadConfigCache()
var lastCheckpoint string
var lastCheckTime time.Time
if err != nil {
log.GetLogger().WithError(err).Error("load local state configuration fail")
}
if cachedResult != nil {
lastCheckpoint = cachedResult.Checkpoint
log.GetLogger().Debugf("last state configuration checkpoint: %s", lastCheckpoint)
lastCheckTime, err = timetool.ParseApiTime(lastCheckpoint)
} else {
lastCheckpoint = ""
}
result := cachedResult
// 如果是刚刚拉取过则使用缓存
if time.Now().Sub(lastCheckTime).Minutes() > 1 {
info, err := instance.GetInstanceInfo()
log.GetLogger().Debugf("instance information: %s", info)
if err != nil {
log.GetLogger().WithError(err).Error("get instance info failed")
return
}
resp, err := ListInstanceStateConfigurations(lastCheckpoint, info.AgentName, info.AgentVersion,
info.ComputerName, info.PlatformName, info.PlatformType, info.PlatformVersion,
info.IpAddress, info.RamRole)
if err != nil {
if resp != nil && resp.ErrCode == "ServiceNotSupported" {
log.GetLogger().Warn("state manager feature is not supported in current region")
oneDay := 24 * 60 * 60
if refreshIntervalSeconds != oneDay {
refreshIntervalSeconds = oneDay
CancelStateManagerTimer()
InitStateManagerTimer()
}
return
}
log.GetLogger().WithError(err).Error("fail to list state configurations")
return
}
if resp.Result != nil && resp.Result.Changed {
// 未变更的情况下,服务端没有完整返回配置,使用缓存
result = resp.Result
WriteConfigCache(result)
}
}
var targetInterval = result.Interval
if targetInterval == 0 {
// interval can recover to default once api call is successful and no interval is returned from server
targetInterval = DefaultRefreshIntervalSeconds
}
if targetInterval != refreshIntervalSeconds {
// 拉取配置的间隔变更,立即重新调度,会触发再次执行本函数,本次执行可以直接返回
log.GetLogger().Infof("state manager refresh interval changes from %d to %d seconds", refreshIntervalSeconds, targetInterval)
refreshIntervalSeconds = targetInterval
CancelStateManagerTimer()
InitStateManagerTimer()
return
}
log.GetLogger().Infof("use state configurations: %v", result)
updateStateConfigs(result.StateConfigurations)
refreshStateConfigTimers(result.StateConfigurations)
runtime.GC()
}
func enforce(config StateConfiguration) (err error) {
var msg string
var mode = getMode(config)
log.GetLogger().WithFields(logrus.Fields{
"stateConfigurationId": config.StateConfigurationId,
"configureMode": mode,
}).Infof("start enforcing state configuration")
if mode == Skip {
return
}
content, err := LoadTemplateCache(config.TemplateName, config.TemplateVersion)
if err != nil {
log.GetLogger().WithError(err).Warn("load template from cache failed")
}
if content == nil {
resp, err2 := GetTemplate(config.TemplateName, config.TemplateVersion)
if err2 != nil {
log.GetLogger().WithError(err2).Error("GetTemplate failed")
msg = fmt.Sprintf("GetTemplate %s %s failed: %s", config.TemplateName, config.TemplateVersion, err2.Error())
reportResult(config, Failed, mode, map[string]interface{}{"message": msg})
return err2
} else {
content = []byte(resp.Result.Content)
WriteTemplateCache(config.TemplateName, config.TemplateVersion, content)
}
}
resourceStates, err := ParseResourceState(content, config.Parameters)
if err != nil {
return
}
if len(resourceStates) == 0 {
log.GetLogger().Errorf("no state definition is parsed from configuration %s", config.StateConfigurationId)
return
}
var resultStatus, singleStatus string
var extraInfo string
var notSuccessItems = make(map[string]interface{})
switch mode {
case Apply:
for index, rs := range resourceStates {
resultStatus, extraInfo, err = rs.Apply()
if resultStatus == Failed {
notSuccessItems[fmt.Sprintf("%dth state", index)] = err.Error()
break
}
if resultStatus == NotCompliant {
// apply should not return NotCompliant
resultStatus = Failed
notSuccessItems[fmt.Sprintf("%dth state", index)] = extraInfo
break
}
}
case Monitor:
resultStatus = Compliant
for index, rs := range resourceStates {
singleStatus, extraInfo, err = rs.Monitor()
if singleStatus == Failed {
notSuccessItems[fmt.Sprintf("%dth state", index)] = err.Error()
resultStatus = Failed
break
}
if singleStatus == NotCompliant {
notSuccessItems[fmt.Sprintf("%dth state", index)] = extraInfo
resultStatus = NotCompliant
}
}
}
if resultStatus == "" {
resultStatus = Failed
}
log.GetLogger().WithFields(logrus.Fields{
"stateConfigurationId": config.StateConfigurationId,
"configureMode": mode,
}).Infof("result status is %s", resultStatus)
reportResult(config, resultStatus, mode, notSuccessItems)
return
}
func hasApplied(config StateConfiguration) bool {
if config.SuccessfulApplyTime == "" {
return false
}
if config.DefinitionUpdateTime == "" {
log.GetLogger().Errorf("state configuration %s missing DefinitionUpdateTime", config.StateConfigurationId)
// API should always returns DefinitionUpdateTime
// if not, assume state definition is not updated
return true
}
successfulApplyTime, err2 := timetool.ParseApiTime(config.SuccessfulApplyTime)
defUpdateTime, err1 := timetool.ParseApiTime(config.DefinitionUpdateTime)
if err1 != nil || err2 != nil {
log.GetLogger().Errorf("invalid DefinitionUpdateTime %s or SuccessfulApplyTime%s", config.DefinitionUpdateTime, config.SuccessfulApplyTime)
return true
}
if defUpdateTime.After(successfulApplyTime) {
// state definition (template + parameters) updated after last successful apply
// it means new definition has never applied once
return false
}
return true
}
func getMode(config StateConfiguration) string {
switch config.ConfigureMode {
case ApplyOnly:
if hasApplied(config) {
return Skip
}
return Apply
case ApplyAndMonitor:
if hasApplied(config) {
return Monitor
}
return Apply
case ApplyAndAutoCorrect:
return Apply
default:
log.GetLogger().Errorf("invalid configure mode %s", config.ConfigureMode)
return Skip
}
}
func reportResult(config StateConfiguration, status, mode string, extraInfo map[string]interface{}) (err error) {
var extraInfoStr string
if extraInfo != nil && len(extraInfo) > 0 {
data, _ := json.Marshal(extraInfo)
extraInfoStr = string(data)
}
err = PutInstanceStateReport(config.StateConfigurationId, status, extraInfoStr, mode, "")
if err != nil {
log.GetLogger().WithError(err).Error("put state report error")
}
return
}
// InitStateManagerTimer starts timer for state manage feature
func InitStateManagerTimer() error {
if _stateManageTimer == nil {
_statemanageTimerInitLock.Lock()
defer _statemanageTimerInitLock.Unlock()
if _stateManageTimer == nil {
log.GetLogger().Infof("initialize state manager timer with interval %d seconds", refreshIntervalSeconds)
timerManager := timermanager.GetTimerManager()
timer, err := timerManager.CreateTimerInSeconds(refreshStateConfigs, refreshIntervalSeconds)
if err != nil {
log.GetLogger().WithError(err).Error("create state manager timer failed")
return err
}
_stateManageTimer = timer
go func() {
// shuffle state manager task in 3 minutes
mills := rand.Intn(MaxInitTimerDriftSeconds * 1000)
time.Sleep(time.Duration(mills) * time.Millisecond)
log.GetLogger().Info("run state manager timer")
_, err = _stateManageTimer.Run()
if err != nil {
log.GetLogger().WithError(err).Error("run state manager timer failed")
}
}()
return nil
}
return errors.New("state manage timer has been initialized")
}
return errors.New("state manage timer has been initialized")
}
func CancelStateManagerTimer() error {
if _stateManageTimer != nil {
log.GetLogger().Infoln("cancel state manager timer")
_statemanageTimerInitLock.Lock()
defer _statemanageTimerInitLock.Unlock()
if _stateManageTimer != nil {
timermanager.GetTimerManager().DeleteTimer(_stateManageTimer)
_stateManageTimer = nil
}
}
return nil
}
func IsStateManagerTimerRunning() bool {
if _stateManageTimer != nil {
_statemanageTimerInitLock.Lock()
defer _statemanageTimerInitLock.Unlock()
return _stateManageTimer.IsRunning()
}
return false
}
func NewResourceState(state StateDef) (rs resources.ResourceState, err error) {
switch state.ResourceType {
case "ACS:Inventory":
rs = &resources.InventoryState{}
rs.Load(state.Properties)
case "ACS:File":
rs = &resources.FileState{}
rs.Load(state.Properties)
default:
log.GetLogger().Error("unsupported resource type ", state.ResourceType)
err = fmt.Errorf("unsupported resource type %s in template", state.ResourceType)
}
log.GetLogger().Infof("resource state definition: %v", rs)
return
}