agent/heartbeat/heartbeat.go (368 lines of code) (raw):
package heartbeat
import (
"errors"
"fmt"
"math/rand"
"net/url"
"os"
"regexp"
"sync"
"sync/atomic"
"time"
"github.com/tidwall/gjson"
"github.com/aliyun/aliyun_assist_client/agent/checknet"
"github.com/aliyun/aliyun_assist_client/agent/flagging"
"github.com/aliyun/aliyun_assist_client/agent/hybrid/instance"
"github.com/aliyun/aliyun_assist_client/agent/log"
"github.com/aliyun/aliyun_assist_client/agent/taskengine/timermanager"
"github.com/aliyun/aliyun_assist_client/agent/util"
"github.com/aliyun/aliyun_assist_client/agent/util/osutil"
"github.com/aliyun/aliyun_assist_client/agent/util/timetool"
"github.com/aliyun/aliyun_assist_client/agent/version"
"github.com/aliyun/aliyun_assist_client/common/httpbase"
"github.com/aliyun/aliyun_assist_client/common/machineid"
)
const (
// DefaultPingIntervalSeconds is the default interval of heart-beat in seconds
DefaultPingIntervalSeconds = 60
leastIntervalInMilliseconds = 55000
mostIntervalInMilliseconds = 65000
)
var (
// TODO: Centralized manager for timers of essential tasks
_heartbeatTimer *timermanager.Timer
// TODO: Centralized manager for timers of essential tasks, then get rid of this
_heartbeatTimerInitLock sync.Mutex
_startTime time.Time
_retryCounter uint16
_retryMutex *sync.Mutex
_processStartTime time.Time
_acknowledgeCounter uint64
_sendCounter uint64
_lastHttpFailedSendCounter uint64 // record the _sendCounter value when http ping failed
_tryHttp bool
_machineId string
_intervalRand *rand.Rand
// _fieldMissRegexp is used to match error messages for missing fields,
// like: "Required request parameter 'os_type' for method parameter type String is not present"
_fieldMissRegexp = regexp.MustCompile(`Required request parameter '(\w+)' for method parameter type (\w+) is not present`)
// if _useFullFields is true ping hear-beat with full fields,
// otherwise use the reduced fields
_useFullFields atomic.Bool
_networkConnected atomic.Bool
_actionsWhenNetRecover map[string]func()
_actionsWhenNetRecoverLock sync.Mutex
_consecutiveFailedCount int
)
func init() {
_retryCounter = 0
_retryMutex = &sync.Mutex{}
_processStartTime = time.Now()
_acknowledgeCounter = 0
_sendCounter = 0
_lastHttpFailedSendCounter = 0
_tryHttp = true
_machineId, _ = machineid.GetMachineID()
_intervalRand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
func invokePingRequest(isHttpScheme bool, urlWithoutScheme string, willSwitchScheme bool) (response string, err error) {
defer func() {
errMsg := extractErrMsg(response)
if errMsg != "" {
log.GetLogger().Error("heart-beat: ", errMsg)
if miss, fieldName, fieldType := checkFieldsdMissErr(errMsg); miss {
_useFullFields.Store(true)
log.GetLogger().Errorf("heart-beat request miss field[%s:%s] ", fieldName, fieldType)
err = fmt.Errorf("request field missing")
}
}
}()
httpRequestURL := "http://" + urlWithoutScheme
httpsRequestURL := "https://" + urlWithoutScheme
var requestURL, switchedRequestUrl *string
if isHttpScheme {
requestURL = &httpRequestURL
switchedRequestUrl = &httpsRequestURL
} else {
requestURL = &httpsRequestURL
switchedRequestUrl = &httpRequestURL
}
err, response = util.HttpGet(*requestURL)
if err != nil {
tmp_err, ok := err.(*httpbase.StatusCodeError)
if !(ok && tmp_err.StatusCode() < 500) {
_retryMutex.Lock()
defer _retryMutex.Unlock()
Gap := time.Since(_startTime)
//more than 1h than reset counter and start time.
if Gap.Minutes() >= 60 {
_retryCounter = 0
_startTime = time.Now()
}
//less than 1h and counter more than 3.
if _retryCounter >= 3 {
log.GetLogger().WithFields(log.Fields{
"requestURL": *requestURL,
"response": response,
}).WithError(err).Errorln("Retry too frequent")
} else {
//do retry
time.Sleep(3 * time.Second)
_retryCounter++
err, response = util.HttpGet(*requestURL)
if err == nil {
// Keep use current scheme next time
if isHttpScheme {
_tryHttp = true
} else {
_tryHttp = false
_lastHttpFailedSendCounter = _sendCounter
}
return response, nil
}
log.GetLogger().WithFields(log.Fields{
"requestURL": *requestURL,
"response": response,
}).WithError(err).Errorln("Retry failed")
if willSwitchScheme {
err, response = util.HttpGet(*switchedRequestUrl)
if err == nil {
// Use another scheme next time
if isHttpScheme {
_tryHttp = false
_lastHttpFailedSendCounter = _sendCounter
} else {
_tryHttp = true
}
return response, nil
}
log.GetLogger().WithFields(log.Fields{
"switchedRequestURL": *switchedRequestUrl,
"response": response,
}).WithError(err).Errorln("Retry failed with switched requestURL")
}
}
}
// Use another scheme next time
if isHttpScheme {
_tryHttp = false
_lastHttpFailedSendCounter = _sendCounter
} else {
_tryHttp = true
}
return "", err
}
return response, nil
}
func randomNextInterval() time.Duration {
nextIntervalInMilliseconds := _intervalRand.Intn(mostIntervalInMilliseconds-leastIntervalInMilliseconds+1) + leastIntervalInMilliseconds
return time.Duration(nextIntervalInMilliseconds) * time.Millisecond
}
func extractNextInterval(content string) time.Duration {
if !gjson.Valid(content) {
log.GetLogger().WithFields(log.Fields{
"response": content,
}).Errorln("Invalid json response")
return randomNextInterval()
}
json := gjson.Parse(content)
nextIntervalField := json.Get("nextInterval")
if !nextIntervalField.Exists() {
log.GetLogger().WithFields(log.Fields{
"response": content,
}).Errorln("nextInterval field not found in json response")
return randomNextInterval()
}
nextIntervalValue, ok := nextIntervalField.Value().(float64)
if !ok {
log.GetLogger().WithFields(log.Fields{
"response": content,
}).Errorln("Invalid nextInterval value in json response")
return randomNextInterval()
}
nextIntervalInMilliseconds := int(nextIntervalValue)
if nextIntervalInMilliseconds < leastIntervalInMilliseconds || nextIntervalInMilliseconds > mostIntervalInMilliseconds {
return randomNextInterval()
}
return time.Duration(nextIntervalInMilliseconds) * time.Millisecond
}
func extractErrMsg(content string) string {
if !gjson.Valid(content) {
log.GetLogger().WithFields(log.Fields{
"response": content,
}).Errorln("Invalid json response")
return ""
}
json := gjson.Parse(content)
errMsgField := json.Get("errMsg")
if !errMsgField.Exists() {
return ""
}
errMsg, ok := errMsgField.Value().(string)
if !ok {
log.GetLogger().WithFields(log.Fields{
"response": content,
}).Errorln("Invalid errMsg value in json response")
return ""
}
return errMsg
}
func checkFieldsdMissErr(errMsg string) (matched bool, fieldName string, fieldType string) {
if _fieldMissRegexp.MatchString(errMsg) {
matched = true
items := _fieldMissRegexp.FindStringSubmatch(errMsg)
if len(items) != 3 {
return
}
fieldName = items[1]
fieldType = items[2]
}
return
}
func doPing() error {
sendCounter := _sendCounter
var querystring string
if _useFullFields.Load() {
querystring = buildFullFieldsPingParams(sendCounter)
} else {
querystring = buildPingParams(sendCounter)
}
// Use non-secure HTTP protocol by default to reduce performance impact from
// TLS in trustable network environment...
isHttpScheme := true
willSwitchScheme := true
// If HTTP protocol is not accessible use HTTPS. Actively try the http protocol
// after 24 * 60 heart-beats
if !_tryHttp {
if sendCounter-_lastHttpFailedSendCounter > 24*60 {
log.GetLogger().Info("heart-beat by https more than 24*60 times, try http")
isHttpScheme = true
_tryHttp = true
} else {
isHttpScheme = false
}
}
// ...but internet for hybrid mode is obviously untrusted
if instance.IsHybrid() {
isHttpScheme = false
willSwitchScheme = false
}
urlWithoutScheme := util.GetPingService() + querystring
responseContent, err := invokePingRequest(isHttpScheme, urlWithoutScheme, willSwitchScheme)
if err != nil {
_consecutiveFailedCount += 1
_networkConnected.Store(false)
log.GetLogger().WithFields(log.Fields{
"requestURLWithourScheme": urlWithoutScheme,
"isHttpScheme": isHttpScheme,
}).WithError(err).Errorln("Failed to invoke ping request")
if _consecutiveFailedCount >= 3 {
if e := checknet.ReportNetworkBlockToSerialPort(err); e != nil {
log.GetLogger().WithError(e).Error("Report network block to serial port failed.")
}
}
return err
} else {
_consecutiveFailedCount = 0
if _networkConnected.CompareAndSwap(false, true) {
go func() {
_actionsWhenNetRecoverLock.Lock()
defer _actionsWhenNetRecoverLock.Unlock()
for name, action := range _actionsWhenNetRecover {
log.GetLogger().Infof("Execute action[%s] after network recover", name)
action()
}
}()
}
}
mutableSchedule, ok := _heartbeatTimer.Schedule.(*timermanager.MutableScheduled)
if !ok {
log.GetLogger().Errorln("Unexpected schedule type of heartbeat timer")
return nil
}
// Not so graceful way to reset interval of timer: too much implementation exposed.
mutableSchedule.SetInterval(extractNextInterval(responseContent))
_heartbeatTimer.RefreshTimer()
return nil
}
// buildPingParams constructs simplified heartbeat request parameters
func buildPingParams(sendCounter uint64) (querystring string) {
uptime := osutil.GetUptimeOfMs()
timestamp := timetool.GetAccurateTime()
pid := os.Getpid()
processUptime := time.Since(_processStartTime).Milliseconds()
acknowledgeCounter := _acknowledgeCounter
querystring = fmt.Sprintf("?uptime=%d×tamp=%d&pid=%d&process_uptime=%d&index=%d&seq_no=%d",
uptime, timestamp, pid, processUptime, acknowledgeCounter, sendCounter)
// Only first heart-beat need to carry extra params
if acknowledgeCounter == 0 {
isColdstart := false
if _isColdstart, err := flagging.IsColdstart(); err != nil {
log.GetLogger().WithError(err).Errorln("Error encountered when detecting cold-start flag")
} else {
isColdstart = _isColdstart
}
virtType := "kvm" // osutil.GetVirtualType() is currently unavailable
osVersion := osutil.GetVersion()
azoneId := util.GetAzoneId()
encodedOsVersion := url.QueryEscape(osVersion)
querystring += fmt.Sprintf("&virt_type=%s&os_version=%s&az=%s&machineid=%s&cold_start=%t", virtType,
encodedOsVersion, azoneId, _machineId, isColdstart)
}
return
}
// buildFullFieldsPingParams constructs a full set of heartbeat request
// parameters to be compatible with servers that do not recognize the simplified
// heartbeat parameters.
func buildFullFieldsPingParams(sendCounter uint64) (querystring string) {
uptime := osutil.GetUptimeOfMs()
timestamp := timetool.GetAccurateTime()
pid := os.Getpid()
processUptime := time.Since(_processStartTime).Milliseconds()
acknowledgeCounter := _acknowledgeCounter
querystring = fmt.Sprintf("?uptime=%d×tamp=%d&pid=%d&process_uptime=%d&index=%d&seq_no=%d",
uptime, timestamp, pid, processUptime, acknowledgeCounter, sendCounter)
virtType := "kvm" // osutil.GetVirtualType() is currently unavailable
osType := osutil.GetOsType()
osVersion := url.QueryEscape(osutil.GetVersion())
azId := util.GetAzoneId()
querystring += fmt.Sprintf("&virt_type=%s&lang=golang&os_type=%s&os_version=%s&app_version=%s&az=%s",
virtType, osType, osVersion, version.AssistVersion, azId)
// Only first heart-beat need to carry extra params
if acknowledgeCounter == 0 {
isColdstart := false
if _isColdstart, err := flagging.IsColdstart(); err != nil {
log.GetLogger().WithError(err).Errorln("Error encountered when detecting cold-start flag")
} else {
isColdstart = _isColdstart
}
querystring += fmt.Sprintf("&cold_start=%t", isColdstart)
}
return
}
func PingwithRetries(retryCount int) {
for i := 0; i < retryCount; i++ {
if err := doPing(); err == nil {
_acknowledgeCounter++
break
}
}
_sendCounter++
}
func pingWithoutRetry() {
// Error(s) encountered during heart-beating has been logged internally,
// simply ignore it here.
if err := doPing(); err == nil {
_acknowledgeCounter++
}
_sendCounter++
}
func InitHeartbeatTimer() error {
if _heartbeatTimer == nil {
_heartbeatTimerInitLock.Lock()
defer _heartbeatTimerInitLock.Unlock()
if _heartbeatTimer == nil {
timerManager := timermanager.GetTimerManager()
timer, err := timerManager.CreateTimerInSeconds(pingWithoutRetry, DefaultPingIntervalSeconds)
if err != nil {
return err
}
_heartbeatTimer = timer
// Heart-beat at starting SHOULD be executed in main goroutine,
// subsequent sending would be invoked in TimerManager goroutines
mutableSchedule, ok := _heartbeatTimer.Schedule.(*timermanager.MutableScheduled)
if !ok {
return errors.New("Unexpected schedule type of heart-beat timer")
}
mutableSchedule.NotImmediately()
_, err = _heartbeatTimer.Run()
if err != nil {
return err
}
return nil
}
return errors.New("Heartbeat timer has been initialized")
}
return errors.New("Heartbeat timer has been initialized")
}
func RegisterActionWhenNetRecover(actions map[string]func()) {
_actionsWhenNetRecoverLock.Lock()
defer _actionsWhenNetRecoverLock.Unlock()
if _actionsWhenNetRecover == nil {
_actionsWhenNetRecover = make(map[string]func())
}
for name, f := range actions {
_actionsWhenNetRecover[name] = f
}
}