func()

in vermeer/apps/master/bl/compute_task.go [252:336]


func (ctb *ComputeTaskBl) SettingGraphStatus(
	taskId int32, state string, workerName string, errorMsg string) {
	defer func() {
		if r := recover(); r != nil {
			logrus.Errorf("ComputeTask SettingGraph panic recover taskID:%v, panic:%v, stack message: %s", taskId, r,
				common.GetCurrentGoroutineStack())
		}
	}()
	computeTask := computerTaskMgr.GetTask(taskId)
	if computeTask == nil || computeTask.Task.State == structure.TaskStateError || computeTask.Task.State == structure.TaskStateCanceled {
		return
	}
	computeTask.Task.SetWorkerState(workerName, structure.TaskState(state))
	graph := graphMgr.GetGraphByName(computeTask.Task.SpaceName, computeTask.Task.GraphName)
	if graph == nil {
		logrus.Errorf("graph not exist")
		return
	}
	logrus.Infof("ComputeTask SettingGraph task: %d, worker: %s, state: %s",
		taskId, workerName, state)
	if structure.TaskState(state) == structure.TaskStateError {
		logrus.Infof("ComputeTaskStatus task: %d, worker: %s, state: %s", taskId, workerName, state)
		computeTask.Task.SetState(structure.TaskStateError)
		computeTask.Task.SetErrMsg(errorMsg)
		if computeTask.Task.CreateType == structure.TaskCreateSync {
			computeTask.Task.GetWg().Done()
		}
		//atomic.AddInt32(&graph.UsingNum, -1)
		graph.SubUsingNum()
		computeTask.FreeMemory()
		common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(computeTask.Task.Type).Dec()
		if computeTask.Task.CreateType == structure.TaskCreateAsync {
			if err := Scheduler.CloseCurrent(taskId); err != nil {
				logrus.Errorf("failed to close task with ID: %d,err:%v", taskId, err)
			}
		}
		err := taskMgr.SaveTask(computeTask.Task.ID)
		if err != nil {
			logrus.Errorf("save task info error:%v", err)
		}
	} else if computeTask.Task.CheckTaskState(structure.TaskState(state)) {
		if structure.TaskState(state) == structure.TaskStateSettingOutEdgesOK {
			graph.UseOutEdges = true
			computeTask.Task.SetState(structure.TaskStateSettingOutEdgesOK)
			if computeTask.SettingOutDegree {
				err := StartComputeTask(graph, computeTask, pb.ComputeAction_SettingOutDegree)
				if err != nil {
					computeTask.Task.SetState(structure.TaskStateError)
					computeTask.Task.SetErrMsg(fmt.Sprintf("start compute task error:%v", err.Error()))
					return
				}
			} else {
				//开始落盘
				go func() {
					_, ok := GraphPersistenceTask.Operate(graph.SpaceName, graph.Name, WriteDisk)
					if !ok {
						logrus.Errorf("graph %v write disk failed", graph.Name)
					}
				}()
				err := StartComputeTask(graph, computeTask, pb.ComputeAction_Compute)
				if err != nil {
					computeTask.Task.SetState(structure.TaskStateError)
					computeTask.Task.SetErrMsg(fmt.Sprintf("start compute task error:%v", err.Error()))
					return
				}
			}
		} else if structure.TaskState(state) == structure.TaskStateSettingOutDegreeOK {
			graph.UseOutDegree = true
			//开始落盘
			go func() {
				_, ok := GraphPersistenceTask.Operate(graph.SpaceName, graph.Name, WriteDisk)
				if !ok {
					logrus.Errorf("graph %v write disk failed", graph.Name)
				}
			}()
			computeTask.Task.SetState(structure.TaskStateSettingOutDegreeOK)
			err := StartComputeTask(graph, computeTask, pb.ComputeAction_Compute)
			if err != nil {
				computeTask.Task.SetState(structure.TaskStateError)
				computeTask.Task.SetErrMsg(fmt.Sprintf("start compute task error:%v", err.Error()))
				return
			}
		}
	}
}