in vermeer/apps/master/services/service.go [65:155]
func (s *service) Init() error {
//s.ComputerHandler = &ComputeTaskServer{}
//s.SuperStepHandler = &SuperStepServer{}
//s.LoadGraphHandler = &LoadGraphServer{}
// init worker manager
WorkerMgr.Init()
GraphMgr.Init()
TaskMgr.Init()
LoadGraphMgr.Init()
ComputerTaskMgr.Init()
AlgorithmMgr.Init()
AccessMgr.Init()
SpaceMgr.Init()
serverMgr.Init()
scheduler.Init()
threshold.Init()
if err := serviceStore.Init(); err != nil {
return err
}
//GraphPersistenceTask.Run()
for _, maker := range algorithms.Algorithms {
AlgorithmMgr.Register(maker, "built-in")
}
AlgorithmMgr.LoadPlugins()
//恢复本地落盘数据
err := GraphMgr.InitStore()
if err != nil {
return err
}
graphs, err := GraphMgr.ReadAllInfo()
if err != nil {
return err
}
for _, graph := range graphs {
if graph == nil {
logrus.Errorf("load graph info nil")
continue
}
if graph.OnDisk && (graph.State == structure.GraphStateLoaded || graph.State == structure.GraphStateOnDisk) {
// no need to save state
graph.SetState(structure.GraphStateInComplete)
} else {
if graph.State != structure.GraphStateCreated {
//graph.SetState(structure.GraphStateError)
GraphMgr.SetError(graph)
}
}
//graph.UsingNum = 0
graph.ResetUsingNum()
err = GraphMgr.AddGraph(graph)
if err != nil {
logrus.Errorf("add graph error:%v", err)
return err
}
}
err = TaskMgr.InitStore()
if err != nil {
return err
}
tasks, err := TaskMgr.ReadAllTask()
if err != nil {
return err
}
for _, task := range tasks {
_, err := TaskMgr.AddTask(task)
if err != nil {
return err
}
}
for _, task := range TaskMgr.GetAllWaitingTasks() {
logrus.Infof("recover a waiting task '%d' of %s/%s", task.ID, task.SpaceName, task.GraphName)
if _, err := scheduler.QueueTask(task); err != nil {
logrus.Errorf("failed to recover a task into the queue tasks, caused by: %v", err)
}
}
if serviceStore.GetDispatchPause() {
scheduler.PauseDispatch()
logrus.Info("recovered dispatching to be paused")
}
return nil
}