agent/commandermanager/commandermanager.go (215 lines of code) (raw):

package commandermanager import ( "fmt" "os" "path/filepath" "sync" "time" "github.com/aliyun/aliyun_assist_client/agent/log" "github.com/aliyun/aliyun_assist_client/agent/pluginmanager" "github.com/aliyun/aliyun_assist_client/agent/pluginmanager/acspluginmanager" "github.com/aliyun/aliyun_assist_client/agent/taskengine/taskerrors" "github.com/aliyun/aliyun_assist_client/common/fileutil" "github.com/aliyun/aliyun_assist_client/interprocess/messagebus/buses" "github.com/aliyun/aliyun_assist_client/thirdparty/sirupsen/logrus" "github.com/google/uuid" ) type CommanderManager struct { // name -> commander commanders map[string]*Commander // name -> version needUpdate map[string]string // name -> handshaketoken handshakeToken sync.Map // underlying plugin manager pluginManager *acspluginmanager.PluginManager l sync.Mutex } const ( ContainerCommanderName = "ACS-ECS-ContainerCommander" AgentApiVersion = "v1" ) var _cm *CommanderManager // InitCommanderManager init commander manager and load Commander which names // commanderName from installed_plugins file, all local Commanders will be // loaded if commanderName is empty. func InitCommanderManager(commanderName string) { _cm = &CommanderManager{} _cm.commanders = make(map[string]*Commander) _cm.needUpdate = make(map[string]string) pluginManager, err := acspluginmanager.NewPluginManager(false) if err != nil { log.GetLogger().WithError(err).Error("InitCommanderManager: get PLUGINDIR failed.") return } _cm.pluginManager = pluginManager _cm.loadCommanderFromLocal(commanderName) pluginmanager.SetUpdateHandler(func(name, version string) bool { return _cm.markUpdate(name, version) }) } // GetCommanderRunning return the named Commander, Commander will be started // before return. func GetCommanderRunning(name string) (*Commander, *taskerrors.CommanderError) { return _cm.getCommanderRunning(name) } // GetCommander return the named Commander. func GetCommander(name string) (*Commander, *taskerrors.CommanderError) { return _cm.getCommander(name) } // GetCommanderWhenHandshake will be called during the handshake, it finds // the named Commander and check handshake token. func GetCommanderWhenHandshake(name, token string) (*Commander, error) { return _cm.getCommanderWhenHandshake(name, token) } func (m *CommanderManager) loadCommanderFromLocal(name string) error { commanders, err := acspluginmanager.LoadAllPluginFromLocal(pluginmanager.PLUGIN_COMMANDER) if err != nil { return err } if len(commanders) == 0 { log.GetLogger().Warn("no commander found") if name != "" { return fmt.Errorf("not found") } return nil } var found bool for _, c := range commanders { if name != "" && c.Name != name { continue } cmdPath := m.pluginManager.GetPluginCommandPath(&c) if !fileutil.CheckFileIsExist(cmdPath) { log.GetLogger().Errorf("Commander %s found in local but the cmdPath[%s] not exist", c.Name, cmdPath) continue } endpoint := buses.NewEndpoint(c.CommanderInfo.EndpointType, filepath.Join(os.TempDir(), c.CommanderInfo.EndpointFile)) if c.CommanderInfo.EndpointType == "npipe" { endpoint = buses.NewEndpoint(c.CommanderInfo.EndpointType, c.CommanderInfo.EndpointFile) } pidFile := filepath.Join(os.TempDir(), c.CommanderInfo.PidFile) cfg := &CommanderConfig{ CommanderName: c.Name, CmdPath: cmdPath, PidFile: pidFile, Endpoint: endpoint, Version: c.Version, ApiVersion: c.CommanderInfo.ApiVersion, // default timeout StartTimeout: time.Duration(5) * time.Second, AttachTimeout: time.Duration(5) * time.Second, DialTimeout: time.Duration(5) * time.Second, } commander := NewCommander(cfg) m.commanders[c.Name] = commander found = true log.GetLogger().WithFields(logrus.Fields{ "name": c.Name, "version": c.Version, }).Info("commander loaded") } if name != "" && !found { return fmt.Errorf("not found") } return nil } func (m *CommanderManager) installCommanderFromOnline(name, version string) error { pluginInfo, err := acspluginmanager.QueryPluginFromOnline(name, pluginmanager.PLUGIN_COMMANDER, version) if err != nil { log.GetLogger().WithFields(logrus.Fields{ "name": name, "version": version, }).WithError(err).Error("query commander from online failed") return err } // TODO: timeout for installing plugin from online if err := m.pluginManager.InstallPluginFromOnline(pluginInfo, 30); err != nil { log.GetLogger().WithFields(logrus.Fields{ "name": name, "version": version, }).WithError(err).Error("install commander from online failed") return err } log.GetLogger().WithFields(logrus.Fields{ "name": pluginInfo.Name, "version": pluginInfo.Version, }).Info("commander installed from online") return nil } func (m *CommanderManager) getCommanderRunning(name string) (*Commander, *taskerrors.CommanderError) { m.l.Lock() defer m.l.Unlock() // 1. If commander is loaded and is running, return it. if c, ok := m.commanders[name]; ok && c.IsRunning() { return c, nil } // Generate handshake token handshakeToken := uuid.NewString() m.handshakeToken.Store(name, handshakeToken) defer m.handshakeToken.Delete(name) // 2. If commander need update install new version from online, // else start the loaded commander and return it. errMsg := []string{} if v, ok := m.needUpdate[name]; ok { log.GetLogger().Infof("try to update commander[%s] to version[%s]", name, v) if err := m.installCommanderFromOnline(name, v); err != nil { errMsg = append(errMsg, fmt.Sprintf("install %s %s from online, %v", name, v, err)) } } else if c, ok := m.commanders[name]; ok { if err := c.Start(handshakeToken); err == nil { log.GetLogger().Info("start commander success") return c, nil } else { log.GetLogger().Error("start commander failed: ", err) } } // 3. Try to load commander from local or install from online, // then start commander and return it. if err := m.loadCommanderFromLocal(name); err != nil { errMsg = append(errMsg, fmt.Sprintf("load %s from local, %v", name, err)) if err := m.installCommanderFromOnline(name, ""); err != nil { errMsg = append(errMsg, fmt.Sprintf("install %s from online, %v", name, err)) return nil, taskerrors.NewLoadCommanderFailedError(errMsg...) } if err := m.loadCommanderFromLocal(name); err != nil { errMsg = append(errMsg, fmt.Sprintf("load %s from local, %v", name, err)) } } if c, ok := m.commanders[name]; !ok { errMsg = append(errMsg, fmt.Sprintf("%s not found", name)) return nil, taskerrors.NewLoadCommanderFailedError(errMsg...) } else { return c, c.Start(handshakeToken) } } func (m *CommanderManager) getCommander(name string) (*Commander, *taskerrors.CommanderError) { m.l.Lock() defer m.l.Unlock() // 1. If commander is loaded, return it. if c, ok := m.commanders[name]; ok { return c, nil } // 2. Install commander from online. errMsg := []string{} if err := m.installCommanderFromOnline(name, ""); err != nil { errMsg = append(errMsg, fmt.Sprintf("install %s from online, %v", name, err)) } // 3. Try to load commander from local and return it. if err := m.loadCommanderFromLocal(name); err != nil { errMsg = append(errMsg, fmt.Sprintf("load %s from local, %v", name, err)) } if c, ok := m.commanders[name]; !ok { errMsg = append(errMsg, fmt.Sprintf("%s not found", name)) return nil, taskerrors.NewLoadCommanderFailedError(errMsg...) } else { return c, nil } } func (m *CommanderManager) getCommanderWhenHandshake(name, token string) (*Commander, error) { if t, ok := m.handshakeToken.Load(name); ok { if tt, ok := t.(string); ok && tt == token { if c, ok := m.commanders[name]; ok { return c, nil } else { return nil, fmt.Errorf("commander not found") } } else { return nil, fmt.Errorf("commander's handshake not match") } } return nil, fmt.Errorf("commander's handshake not found") } func (m *CommanderManager) markUpdate(name, version string) bool { m.l.Lock() defer m.l.Unlock() var c *Commander var ok bool if c, ok = m.commanders[name]; !ok { return false } if c.config.Version == version { log.GetLogger().Infof("commander[%s] version is already %s, no need to update", name, version) } else { log.GetLogger().Infof("mark commander[%s] need to update from %s to %s", name, c.config.Version, version) m.needUpdate[name] = version } return true }