in vermeer/apps/master/bl/compute_task.go [252:336]
func (ctb *ComputeTaskBl) SettingGraphStatus(
taskId int32, state string, workerName string, errorMsg string) {
defer func() {
if r := recover(); r != nil {
logrus.Errorf("ComputeTask SettingGraph panic recover taskID:%v, panic:%v, stack message: %s", taskId, r,
common.GetCurrentGoroutineStack())
}
}()
computeTask := computerTaskMgr.GetTask(taskId)
if computeTask == nil || computeTask.Task.State == structure.TaskStateError || computeTask.Task.State == structure.TaskStateCanceled {
return
}
computeTask.Task.SetWorkerState(workerName, structure.TaskState(state))
graph := graphMgr.GetGraphByName(computeTask.Task.SpaceName, computeTask.Task.GraphName)
if graph == nil {
logrus.Errorf("graph not exist")
return
}
logrus.Infof("ComputeTask SettingGraph task: %d, worker: %s, state: %s",
taskId, workerName, state)
if structure.TaskState(state) == structure.TaskStateError {
logrus.Infof("ComputeTaskStatus task: %d, worker: %s, state: %s", taskId, workerName, state)
computeTask.Task.SetState(structure.TaskStateError)
computeTask.Task.SetErrMsg(errorMsg)
if computeTask.Task.CreateType == structure.TaskCreateSync {
computeTask.Task.GetWg().Done()
}
//atomic.AddInt32(&graph.UsingNum, -1)
graph.SubUsingNum()
computeTask.FreeMemory()
common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(computeTask.Task.Type).Dec()
if computeTask.Task.CreateType == structure.TaskCreateAsync {
if err := Scheduler.CloseCurrent(taskId); err != nil {
logrus.Errorf("failed to close task with ID: %d,err:%v", taskId, err)
}
}
err := taskMgr.SaveTask(computeTask.Task.ID)
if err != nil {
logrus.Errorf("save task info error:%v", err)
}
} else if computeTask.Task.CheckTaskState(structure.TaskState(state)) {
if structure.TaskState(state) == structure.TaskStateSettingOutEdgesOK {
graph.UseOutEdges = true
computeTask.Task.SetState(structure.TaskStateSettingOutEdgesOK)
if computeTask.SettingOutDegree {
err := StartComputeTask(graph, computeTask, pb.ComputeAction_SettingOutDegree)
if err != nil {
computeTask.Task.SetState(structure.TaskStateError)
computeTask.Task.SetErrMsg(fmt.Sprintf("start compute task error:%v", err.Error()))
return
}
} else {
//开始落盘
go func() {
_, ok := GraphPersistenceTask.Operate(graph.SpaceName, graph.Name, WriteDisk)
if !ok {
logrus.Errorf("graph %v write disk failed", graph.Name)
}
}()
err := StartComputeTask(graph, computeTask, pb.ComputeAction_Compute)
if err != nil {
computeTask.Task.SetState(structure.TaskStateError)
computeTask.Task.SetErrMsg(fmt.Sprintf("start compute task error:%v", err.Error()))
return
}
}
} else if structure.TaskState(state) == structure.TaskStateSettingOutDegreeOK {
graph.UseOutDegree = true
//开始落盘
go func() {
_, ok := GraphPersistenceTask.Operate(graph.SpaceName, graph.Name, WriteDisk)
if !ok {
logrus.Errorf("graph %v write disk failed", graph.Name)
}
}()
computeTask.Task.SetState(structure.TaskStateSettingOutDegreeOK)
err := StartComputeTask(graph, computeTask, pb.ComputeAction_Compute)
if err != nil {
computeTask.Task.SetState(structure.TaskStateError)
computeTask.Task.SetErrMsg(fmt.Sprintf("start compute task error:%v", err.Error()))
return
}
}
}
}