agent/channel/channel_gshell.go (249 lines of code) (raw):
package channel
import (
"encoding/json"
"errors"
"fmt"
"os"
"sync"
"time"
"github.com/aliyun/aliyun_assist_client/agent/clientreport"
"github.com/aliyun/aliyun_assist_client/agent/hybrid/instance"
"github.com/aliyun/aliyun_assist_client/agent/log"
"github.com/aliyun/aliyun_assist_client/agent/metrics"
"github.com/aliyun/aliyun_assist_client/agent/util"
)
const (
defaultGshellMaxFrequencyCount = 10
defaultGshellMaxFrequencyPeriod = 10
)
var (
// limit gshell kick_vm to occur up to gshellMaxFrequencyCount times in gshellMaxFrequencyPeriod second
gshellMaxFrequencyCount = defaultGshellMaxFrequencyCount // 0 means no limit
gshellMaxFrequencyPeriod = defaultGshellMaxFrequencyPeriod // the unit is seconds
gshellMaxFrequencyMutex sync.Mutex
intervalToOpenNoGshellChannel = time.Minute
)
type GshellChannel struct {
*Channel
hGshell *os.File
WaitCheckDone sync.WaitGroup
kickvmTimes []*time.Time
kickvmTimeStart int
kickvmTimeEnd int
}
type gshellStatus struct {
Code int64 `json:"code"`
GshellSupport string `json:"gshellSupport"`
InstanceID string `json:"instanceId"`
RequestID string `json:"requestId"`
Retry int64 `json:"retry"`
ThrottlingConfig struct {
MaxKickVmCount int `json:"maxKickVmCount"`
MaxKickVmPeriod int `json:"maxKickVmPeriod"`
WssCoolDownCount int `json:"wssCoolDownCount"`
WssCoolDownTime int `json:"wssCoolDownTime"`
} `json:"throttlingConfig"`
}
func (c *GshellChannel) IsSupported() bool {
if instance.IsHybrid() {
return false
}
if !c.Working.IsSet() {
if c.startChannelUnsafe() != nil {
return false
}
}
url := util.GetGshellCheckService()
resp, err := util.HttpPost(url, "", "text")
if err != nil {
log.GetLogger().Errorln("HttpPost ", url, "error! ", err)
return false
}
log.GetLogger().Infoln("HttpPost ", url, "OK! resp: ", resp)
var gstatus gshellStatus
if err := json.Unmarshal([]byte(resp), &gstatus); err != nil {
return false
}
c.updateMaxFrequency(gstatus.ThrottlingConfig.MaxKickVmCount, gstatus.ThrottlingConfig.MaxKickVmPeriod)
wssCoolDownCount = gstatus.ThrottlingConfig.WssCoolDownCount
wssCoolDownTime = gstatus.ThrottlingConfig.WssCoolDownTime
return gstatus.GshellSupport == "true"
}
// func (c *GshellChannel) canOpenGshell() bool {
// gShellSupport := false
// c.lock.Lock()
// defer c.lock.Unlock()
// gshellPath := "/dev/virtio-ports/org.qemu.guest_agent.0"
// if runtime.GOOS == "windows" {
// gshellPath = "\\\\.\\Global\\org.qemu.guest_agent.0"
// }
// if f, e := os.Open(gshellPath); e == nil {
// gShellSupport = true
// f.Close()
// }
// log.GetLogger().Infoln("open gshell status:", gshellPath, gShellSupport)
// return gShellSupport
// }
func (c *GshellChannel) StartChannel() error {
if c.Working.IsSet() {
return nil
}
return c.startChannelUnsafe()
}
func (c *GshellChannel) startChannelUnsafe() error {
if !c.Working.CompareAndSwap(false, true) {
log.GetLogger().Warning("startChannelUnsafe run duplicated, return it")
return nil // startChannelUnsage 同一时间只能有一个执行
}
var gshellPath string
var e error
var h *os.File
gshellPath, e = getGshellPath()
if e == nil {
h, e = os.OpenFile(gshellPath, os.O_RDWR, 0666)
}
if e != nil {
metrics.GetChannelFailEvent(
metrics.EVENT_SUBCATEGORY_CHANNEL_GSHELL,
"errormsg", fmt.Sprintf("open gshell failed: %s error: %s", gshellPath, e),
"filepath", gshellPath,
"type", ChannelTypeStr(c.ChannelType),
).ReportEvent()
log.GetLogger().Errorln("open gshell failed:", gshellPath, "error:", e)
c.Working.Clear()
return e
}
log.GetLogger().Infoln("open gshell ok:", gshellPath)
c.hGshell = h
c.WaitCheckDone.Add(1)
go func() {
defer c.Working.Clear()
defer c.hGshell.Close()
defer c.WaitCheckDone.Done()
tick := time.NewTicker(time.Duration(200) * time.Millisecond)
defer tick.Stop()
buf := make([]byte, 2048)
var lastKickvmFreqExceedTime time.Time
for {
<-tick.C
n, err := c.hGshell.Read(buf)
if err == nil && n > 0 {
reachedFrequencyLimit, count, period := c.hasReachFrequencyLimit()
retStr := c.CallBack(string(buf[:n]), ChannelGshellType)
if len(retStr) > 0 {
log.GetLogger().Infoln("write:", retStr)
_, err = c.hGshell.Write([]byte(retStr + "\n"))
if err != nil {
log.GetLogger().Errorln("write error:", err)
report := clientreport.ClientReport{
ReportType: "switch_channel_in_gshell",
Info: fmt.Sprintf("start switch :" + err.Error()),
}
clientreport.SendReport(report)
go c.SwitchChannel()
return
}
}
if reachedFrequencyLimit && (time.Since(lastKickvmFreqExceedTime) > intervalToOpenNoGshellChannel) {
lastKickvmFreqExceedTime = time.Now()
tip := fmt.Sprintf("gshell kick_vm has reached max frequency %d times during %d seconds, "+
"try to open no-gshell channel", count, period)
log.GetLogger().Info(tip)
report := clientreport.ClientReport{
ReportType: "switch_channel_in_gshell",
Info: fmt.Sprintf("start switch :" + tip),
}
clientreport.SendReport(report)
// just try to open no-gshell cahnnel, do not close gshell self
go c.openOtherChannel()
}
}
}
}()
return nil
}
func (c *GshellChannel) StopChannel() error {
c.WaitCheckDone.Wait()
return nil
}
func (c *GshellChannel) SwitchChannel() error {
c.StopChannel()
time.Sleep(time.Duration(1) * time.Second)
err := G_ChannelMgr.SelectAvailableChannel(ChannelGshellType)
if err != nil {
for i := 0; i < 5; i++ {
if err := G_ChannelMgr.SelectAvailableChannelAndReport(ChannelNone, "switch_channel_in_gshell", true); err == nil {
return nil
}
time.Sleep(time.Duration(5) * time.Second)
}
} else {
metrics.GetChannelSwitchEvent(
"type", ChannelTypeStr(G_ChannelMgr.GetCurrentChannelType()),
"info", fmt.Sprintf("success: Current channel is %d", G_ChannelMgr.GetCurrentChannelType()),
"reportType", "switch_channel_in_gshell",
).ReportEvent()
report := clientreport.ClientReport{
ReportType: "switch_channel_in_gshell",
Info: fmt.Sprintf("success: Current channel is %d", G_ChannelMgr.GetCurrentChannelType()),
}
clientreport.SendReport(report)
return nil
}
metrics.GetChannelSwitchEvent(
"type", ChannelTypeStr(G_ChannelMgr.GetCurrentChannelType()),
"info", fmt.Sprintf("fail: no available channel"),
"reportType", "switch_channel_in_gshell",
).ReportEvent()
report := clientreport.ClientReport{
ReportType: "switch_channel_in_gshell",
Info: fmt.Sprintf("fail: no available channel"),
}
clientreport.SendReport(report)
return errors.New("no available channel")
}
func (c *GshellChannel) openOtherChannel() error {
if err := G_ChannelMgr.SelectAvailableChannel(ChannelGshellType); err != nil {
log.GetLogger().Error("open other no-gshell channel failed: ", err)
} else {
log.GetLogger().Infof("open other no-gshell channel[%s] success", ChannelTypeStr(G_ChannelMgr.ActiveChannel.GetChannelType()))
}
return errors.New("no available channel")
}
// hasReachFrequencyLimit: check if gshell kick_vm reaches frequency limit
func (c *GshellChannel) hasReachFrequencyLimit() (bool, int, int) {
if gshellMaxFrequencyCount == 0 {
return false, 0, 0
}
gshellMaxFrequencyMutex.Lock()
defer gshellMaxFrequencyMutex.Unlock()
now := time.Now()
if c.kickvmTimeStart == -1 {
c.kickvmTimes[0] = &now
c.kickvmTimeStart = 0
c.kickvmTimeEnd = 0
return false, gshellMaxFrequencyCount, gshellMaxFrequencyPeriod
}
c.kickvmTimeStart = (c.kickvmTimeStart + 1) % gshellMaxFrequencyCount
if c.kickvmTimeStart == c.kickvmTimeEnd {
lastTimepoint := c.kickvmTimes[c.kickvmTimeEnd]
c.kickvmTimes[c.kickvmTimeStart] = &now
c.kickvmTimeEnd = (c.kickvmTimeEnd + 1) % gshellMaxFrequencyCount
if lastTimepoint.Add(time.Second * time.Duration(gshellMaxFrequencyPeriod)).After(now) {
return true, gshellMaxFrequencyCount, gshellMaxFrequencyPeriod
}
}
c.kickvmTimes[c.kickvmTimeStart] = &now
return false, gshellMaxFrequencyCount, gshellMaxFrequencyPeriod
}
func (c *GshellChannel) updateMaxFrequency(count, period int) {
gshellMaxFrequencyMutex.Lock()
defer gshellMaxFrequencyMutex.Unlock()
if gshellMaxFrequencyCount != count {
if count <= 0 {
gshellMaxFrequencyCount = 0
c.kickvmTimes = nil
} else {
gshellMaxFrequencyCount = count
c.kickvmTimes = make([]*time.Time, gshellMaxFrequencyCount)
}
c.kickvmTimeStart = -1
c.kickvmTimeEnd = -1
}
gshellMaxFrequencyPeriod = period
}
func NewGshellChannel(CallBack OnReceiveMsg) IChannel {
g := &GshellChannel{
Channel: &Channel{
CallBack: CallBack,
ChannelType: ChannelGshellType,
},
kickvmTimes: make([]*time.Time, gshellMaxFrequencyCount),
kickvmTimeStart: -1,
kickvmTimeEnd: -1,
}
g.Working.Clear()
return g
}