func()

in vermeer/apps/master/bl/load_task.go [73:260]


func (lb *LoadTaskBl) LoadTaskStatus(taskId int32, state string, workerName string, errorMsg string) {
	defer func() {
		if r := recover(); r != nil {
			logrus.Errorf("LoadTaskStatus panic recover taskID:%v, panic:%v, stack message: %s",
				taskId, r, common.GetCurrentGoroutineStack())
		}
	}()
	loadTask := loadGraphMgr.GetLoadTask(taskId)
	if loadTask == nil || loadTask.Task.State == structure.TaskStateError || loadTask.Task.State == structure.TaskStateCanceled {
		return
	}
	loadTask.Task.SetWorkerState(workerName, structure.TaskState(state))
	if structure.TaskState(state) == structure.TaskStateError {
		logrus.Infof("LoadTaskStatus task: %d, worker: %s, state: %s", taskId, workerName, state)
		taskMgr.SetError(loadTask.Task, errorMsg)
		//loadTask.Task.SetState(structure.TaskStateError)
		//loadTask.Task.SetErrMsg(errorMsg)
		if loadTask.Task.CreateType == structure.TaskCreateSync {
			loadTask.Task.GetWg().Done()
		}
		graph := graphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
		//graph.SetState(structure.GraphStateError)
		graphMgr.SetError(graph)

		for _, wn := range loadTask.Task.Workers {
			//worker := WorkerMgr.GetWorker(wn.Name)
			//if err := worker.loadServer.AsyncLoad(
			if err := ServerMgr.LoadServer(wn.Name).AsyncLoad(
				taskId,
				pb.LoadStep_Error,
				"",
				loadTask.Task.GraphName,
				loadTask.Task.SpaceName,
				nil,
				nil); err != nil {
				logrus.Errorf("failed to perform the AsyncLoad through the worker with name: '%s' in the TaskStatusError state , caused by: %v", wn.Name, err)
			}
		}
		loadTask.FreeMemory()
		time.AfterFunc(1*time.Minute, func() { loadGraphMgr.DeleteTask(taskId) })
		common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(loadTask.Task.Type).Dec()
		if err := Scheduler.CloseCurrent(taskId); err != nil {
			logrus.Errorf("failed to close task with ID: %d,err:%v", taskId, err)
		}
		err := graphMgr.SaveInfo(graph.SpaceName, graph.Name)
		if err != nil {
			logrus.Errorf("save graph info error:%v", err)
		}
		err = taskMgr.SaveTask(loadTask.Task.ID)
		if err != nil {
			logrus.Errorf("save task info error:%v", err)
		}
	} else if loadTask.Task.CheckTaskState(structure.TaskState(state)) {
		logrus.Infof("LoadTaskStatus task: %d, worker: %s, state: %s", taskId, workerName, state)
		if structure.TaskState(state) == structure.TaskStateLoadVertexOK {
			logrus.Infof("load graph TaskStateLoadVertexOK task: %d, graph: %s",
				taskId, loadTask.Task.GraphName)

			loadTask.Task.SetState(structure.TaskStateLoadScatter)
			//TaskMgr.ForceState(loadTask.Task, structure.TaskStateLoadScatter)

			graph := graphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
			graph.DispatchVertexId()

			for _, wn := range loadTask.Task.Workers {
				//worker := WorkerMgr.GetWorker(wn.Name)
				//if err := worker.loadServer.AsyncLoad(
				if err := ServerMgr.LoadServer(wn.Name).AsyncLoad(
					taskId,
					pb.LoadStep_ScatterVertex,
					"",
					loadTask.Task.GraphName,
					loadTask.Task.SpaceName,
					nil,
					nil); err != nil {
					logrus.Errorf("failed to perform the AsyncLoad through the worker with name: %s, caused by: %v", wn.Name, err)
				}
			}
		} else if structure.TaskState(state) == structure.TaskStateLoadScatterOK {
			logrus.Infof("load graph TaskStateLoadScatterOK task: %d, graph: %s",
				taskId, loadTask.Task.GraphName)

			loadTask.Task.SetState(structure.TaskStateLoadEdge)
			//TaskMgr.ForceState(loadTask.Task, structure.TaskStateLoadEdge)

			for _, wn := range loadTask.Task.Workers {
				//worker := WorkerMgr.GetWorker(wn.Name)
				//if err := worker.loadServer.AsyncLoad(
				if err := ServerMgr.LoadServer(wn.Name).AsyncLoad(
					taskId,
					pb.LoadStep_Edge,
					"",
					loadTask.Task.GraphName,
					loadTask.Task.SpaceName,
					nil,
					nil); err != nil {
					logrus.Errorf("failed to perform the AsyncLoad through the worker with name: %s in the TaskStatusLoadScatterOK state, caused by: %v",
						wn.Name, err)
				}
			}
		} else if structure.TaskState(state) == structure.TaskStateLoadEdgeOK {
			logrus.Infof("load graph TaskStateLoadEdgeOK task: %d, graph: %s",
				taskId, loadTask.Task.GraphName)

			loadTask.Task.SetState(structure.TaskStateLoadDegree)
			//TaskMgr.ForceState(loadTask.Task, structure.TaskStateLoadDegree)

			for _, wn := range loadTask.Task.Workers {
				//worker := WorkerMgr.GetWorker(wn.Name)
				//if err := worker.loadServer.AsyncLoad(
				if err := ServerMgr.LoadServer(wn.Name).AsyncLoad(
					taskId,
					pb.LoadStep_OutDegree,
					"",
					loadTask.Task.GraphName,
					loadTask.Task.SpaceName,
					nil,
					nil); err != nil {
					logrus.Errorf("falied to perform the AsyncLoad through the worker with name: %s in the TaskStatusLoadEdgeOK state, caused by: %v",
						workerName, err)
				}
			}
		} else if structure.TaskState(state) == structure.TaskStateLoaded {
			logrus.Infof("load graph TaskStateLoaded task: %d, graph: %s, cost: %v",
				taskId, loadTask.Task.GraphName, time.Since(loadTask.Task.StartTime))
			graph := graphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)

			//graph.SetState(structure.GraphStateLoaded)
			graphMgr.ForceState(graph, structure.GraphStateLoaded)
			graph.Statics()

			loadTask.Task.SetState(structure.TaskStateLoaded)
			//TaskMgr.ForceState(loadTask.Task, structure.TaskStateLoaded)

			logrus.Infof("graph: %s, vertex: %d, edge: %d", graph.Name, graph.VertexCount, graph.EdgeCount)
			for _, w := range graph.Workers {
				logrus.Infof(
					"graph: %s, worker: %s, vertex: %d, edge: %d",
					graph.Name, w.Name, w.VertexCount, w.EdgeCount)
			}

			for _, wn := range loadTask.Task.Workers {
				//worker := WorkerMgr.GetWorker(wn.Name)
				//if err := worker.loadServer.AsyncLoad(
				if err := ServerMgr.LoadServer(wn.Name).AsyncLoad(
					taskId,
					pb.LoadStep_Complete,
					"",
					loadTask.Task.GraphName,
					loadTask.Task.SpaceName,
					nil,
					nil); err != nil {
					logrus.Errorf("failed to perform the AsyncLoad through the worker with name: %s in the TaskStatusLoaded state, caused by %v",
						wn.Name, err)
				}
			}
			if loadTask.Task.CreateType == structure.TaskCreateSync {
				loadTask.Task.GetWg().Done()
			}
			loadTask.FreeMemory()
			time.AfterFunc(1*time.Minute, func() { loadGraphMgr.DeleteTask(taskId) })
			common.PrometheusMetrics.GraphLoadedCnt.WithLabelValues().Inc()
			common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(loadTask.Task.Type).Dec()
			if err := Scheduler.CloseCurrent(taskId); err != nil {
				logrus.Errorf("failed to close task with ID: %d,err:%v", taskId, err)
			}
			err := graphMgr.SaveInfo(graph.SpaceName, graph.Name)
			if err != nil {
				logrus.Errorf("save graph info error:%v", err)
			}
			err = taskMgr.SaveTask(loadTask.Task.ID)
			if err != nil {
				logrus.Errorf("save task info error:%v", err)
			}
			err = taskMgr.FinishTask(loadTask.Task.ID)
			if err != nil {
				logrus.Errorf("cancel task finished error:%v", err.Error())
			}
			//开始落盘
			go func() {
				_, ok := GraphPersistenceTask.Operate(graph.SpaceName, graph.Name, WriteDisk)
				if !ok {
					logrus.Errorf("graph %v write disk failed", graph.Name)
				}
			}()
		}
	}
}