agent/channel/channel_manager.go (319 lines of code) (raw):

package channel import ( "encoding/json" "errors" "fmt" "strings" "sync" "time" "github.com/tidwall/gjson" "github.com/aliyun/aliyun_assist_client/agent/clientreport" "github.com/aliyun/aliyun_assist_client/agent/hybrid" "github.com/aliyun/aliyun_assist_client/agent/hybrid/instance" "github.com/aliyun/aliyun_assist_client/agent/kickvmhandle" "github.com/aliyun/aliyun_assist_client/agent/log" "github.com/aliyun/aliyun_assist_client/agent/metrics" "github.com/aliyun/aliyun_assist_client/agent/taskengine" "github.com/aliyun/aliyun_assist_client/agent/update" "github.com/aliyun/aliyun_assist_client/agent/util/powerutil" ) var _gshellChannel IChannel = nil var _websocketChannel *WebSocketChannel = nil //manage all channels type ChannelMgr struct { ActiveChannel IChannel //current used channel AllChannel []IChannel //the first one is GshellChannel StopChanelEvent chan struct{} WaitCheckDone sync.WaitGroup ChannelSetLock sync.Mutex } //new var G_ChannelMgr *ChannelMgr = &ChannelMgr{ StopChanelEvent: make(chan struct{}), } // try to switch from non-gshell channel to gshell channel func (m *ChannelMgr) checkChannelWorker() bool { log.GetLogger().Infoln("checkChannelWorker") m.ChannelSetLock.Lock() defer m.ChannelSetLock.Unlock() if m.AllChannel[0].IsSupported() && m.AllChannel[0].IsWorking() { if m.AllChannel[1].IsWorking() { m.AllChannel[1].StopChannel() } m.ActiveChannel = m.AllChannel[0] return true } return false } func (m *ChannelMgr) SelectAvailableChannel(currentChannel int) error { log.GetLogger().Infoln("SelectAvailableChannel") m.ChannelSetLock.Lock() defer m.ChannelSetLock.Unlock() for _, item := range m.AllChannel { if currentChannel == item.GetChannelType() { continue } if item.IsSupported() && item.IsWorking() { return nil } } for _, item := range m.AllChannel { if currentChannel == item.GetChannelType() { continue } if item.IsSupported() { if e := item.StartChannel(); e == nil { m.ActiveChannel = item return nil } } } return errors.New("No available channel") } func (m *ChannelMgr) Init(CallBack OnReceiveMsg) error { m.WaitCheckDone.Add(1) go func() { defer m.WaitCheckDone.Done() tick := time.NewTicker(time.Duration(1800) * time.Second) defer tick.Stop() for { select { case <-m.StopChanelEvent: return case <-tick.C: if m.checkChannelWorker() == false { m.SelectAvailableChannelAndReport(ChannelNone, "switch_channel_in_timer", false) } } } }() m.ChannelSetLock.Lock() defer m.ChannelSetLock.Unlock() _websocketChannel = NewWebsocketChannel(CallBack) m.AllChannel = append(m.AllChannel, _gshellChannel, _websocketChannel) for _, item := range m.AllChannel { if item.IsSupported() { if e := item.StartChannel(); e == nil { m.ActiveChannel = item return nil } else { fmt.Println(e.Error()) } } } return errors.New("No available channel") } // SelectAvailableChannelAndReport will call SelectAvailableChannel and report // the result, but the failure will not be reported if ignoreFailed is true. func (m *ChannelMgr) SelectAvailableChannelAndReport(currentChannel int, reason string, ignoreFailed bool) error { err := m.SelectAvailableChannel(ChannelNone) if err == nil || !ignoreFailed { var report clientreport.ClientReport if err == nil { report = clientreport.ClientReport{ ReportType: reason, Info: fmt.Sprintf("success: Current channel is %d", m.GetCurrentChannelType()), } } else { report = clientreport.ClientReport{ ReportType: reason, Info: fmt.Sprintf("fail:" + err.Error()), } } metrics.GetChannelSwitchEvent( "type", ChannelTypeStr(m.GetCurrentChannelType()), "reportType", report.ReportType, "info", report.Info, ).ReportEvent() clientreport.SendReport(report) } return err } // func (m *ChannelMgr) Uninit() { // m.StopChanelEvent <- struct{}{} // m.WaitCheckDone.Wait() // m.ChannelSetLock.Lock() // defer m.ChannelSetLock.Unlock() // m.ActiveChannel.StopChannel() // } func (m *ChannelMgr) GetCurrentChannelType() int { m.ChannelSetLock.Lock() defer m.ChannelSetLock.Unlock() if m.ActiveChannel == nil { return ChannelNone } return m.ActiveChannel.GetChannelType() } func InitChannelMgr(CallBack OnReceiveMsg) error { return G_ChannelMgr.Init(CallBack) } // func StopChannelMgr() error { // G_ChannelMgr.Uninit() // return nil // } func GetCurrentChannelType() int { return G_ChannelMgr.GetCurrentChannelType() } func TryStartGshellChannel() { _gshellChannel = NewGshellChannel(OnRecvMsg) if instance.IsHybrid() == false { err := _gshellChannel.StartChannel() if err != nil { log.GetLogger().Infoln("TryStartGshellChannel failed ", err) } else { log.GetLogger().Infoln("TryStartGshellChannel ok ") } } } func StartChannelMgr() { if err := InitChannelMgr(OnRecvMsg); err != nil { metrics.GetChannelFailEvent( metrics.EVENT_SUBCATEGORY_CHANNEL_MGR, "type", "channelmgr", "errormsg", err.Error(), ).ReportEvent() } } type GshellInvalid struct { Error struct { Class string `json:"class"` Desc string `json:"desc"` } `json:"error"` } type GshellCheck struct { Execute string `json:"execute"` Arguments struct { ID int64 `json:"id"` } `json:"arguments"` } type GshellCheckReply struct { Return int64 `json:"return"` } type GshellCmd struct { Execute string `json:"execute"` Arguments struct { Cmd string `json:"cmd"` } `json:"arguments"` } type GshellCmdReply struct { Return struct { CmdOutput string `json:"cmd_output"` // netcheck field would not presented when no diagnostic result available Netcheck *NetcheckReply `json:"netcheck,omitempty"` Result int `json:"result"` } `json:"return"` } type GshellShutdown struct { Execute string `json:"execute"` Arguments struct { Mode string `json:"mode"` } `json:"arguments"` } func BuildInvalidRet(desc string) string { InvalidRet := GshellInvalid{} InvalidRet.Error.Class = "GenericError" InvalidRet.Error.Desc = desc retStr, _ := json.Marshal(InvalidRet) return string(retStr) } func OnRecvMsg(Msg string, ChannelType int) string { log.GetLogger().Infoln("kick msg:", Msg) // legacy code for websocket kick data proc. if ChannelType == ChannelWebsocketType { if update.IsCriticalActionRunning() { return "reject:" + Msg } if Msg == "kick_vm" { go func() { taskengine.Fetch(true, "", taskengine.NormalTaskType) }() return "accept:" + Msg } else if strings.Contains(Msg, "kick_vm agent deregister") { hybrid.UnRegister(true) } handle := kickvmhandle.ParseOption(Msg) valid_cmd := false if handle != nil { if handle.CheckAction() == true { valid_cmd = true go func() { handle.DoAction() }() } } if valid_cmd == false { return "unknow:" + Msg } else { return "accept:" + Msg } } if !gjson.Valid(Msg) { return BuildInvalidRet("invalid json1") } execute := gjson.Get(Msg, "execute").String() if execute == "guest-sync" { gshellCheck := GshellCheck{} err := json.Unmarshal([]byte(Msg), &gshellCheck) if err != nil { return BuildInvalidRet("invalid json: " + err.Error()) } gshellCheckReply := GshellCheckReply{ Return: gshellCheck.Arguments.ID, } retStr, _ := json.Marshal(gshellCheckReply) return string(retStr) } else if execute == "guest-command" { gshellCmd := GshellCmd{} err := json.Unmarshal([]byte(Msg), &gshellCmd) if err != nil { return BuildInvalidRet("invalid guest-command json: " + err.Error()) } if update.IsCriticalActionRunning() { gshellCmdReply := GshellCmdReply{} gshellCmdReply.Return.Result = 7 gshellCmdReply.Return.CmdOutput = "agent is busy" retStr, _ := json.Marshal(gshellCmdReply) return string(retStr) } if gshellCmd.Arguments.Cmd == "kick_vm" { go func() { taskengine.Fetch(true, "", taskengine.NormalTaskType) }() gshellCmdReply := GshellCmdReply{} gshellCmdReply.Return.Result = 8 gshellCmdReply.Return.CmdOutput = "execute kick_vm success" gshellCmdReply.Return.Netcheck = LastNetcheckReply() retStr, _ := json.Marshal(gshellCmdReply) return string(retStr) } else { handle := kickvmhandle.ParseOption(gshellCmd.Arguments.Cmd) valid_cmd := false if handle != nil { if handle.CheckAction() == true { valid_cmd = true go func() { handle.DoAction() }() } } if valid_cmd == false { gshellCmdReply := GshellCmdReply{} gshellCmdReply.Return.Result = 6 gshellCmdReply.Return.CmdOutput = "invalid command" retStr, _ := json.Marshal(gshellCmdReply) return string(retStr) } else { gshellCmdReply := GshellCmdReply{} gshellCmdReply.Return.Result = 8 gshellCmdReply.Return.CmdOutput = "execute kick_vm success" gshellCmdReply.Return.Netcheck = LastNetcheckReply() retStr, _ := json.Marshal(gshellCmdReply) return string(retStr) } } } else if execute == "guest-shutdown" { gshellShutdown := GshellShutdown{} err := json.Unmarshal([]byte(Msg), &gshellShutdown) if err != nil { return BuildInvalidRet("invalid guest-shutdown command: " + err.Error()) } gshellCmdReply := GshellCmdReply{} gshellCmdReply.Return.Result = 8 gshellCmdReply.Return.CmdOutput = "execute command success" retStr, _ := json.Marshal(gshellCmdReply) reboot := false if gshellShutdown.Arguments.Mode == powerutil.PowerdownMode { } else if gshellShutdown.Arguments.Mode == powerutil.RebootMode { reboot = true } else { return BuildInvalidRet("invalid guest-shutdown command") } powerutil.Shutdown(reboot) return string(retStr) } return BuildInvalidRet("invalid command") } // OnNetworkRecover will be called when network recover func OnNetworkRecover() { _websocketChannel.ResetFailedCount() G_ChannelMgr.SelectAvailableChannelAndReport(ChannelNone, "select_available_chan_when_net_recover", false) }