in vermeer/apps/master/bl/load_task.go [73:260]
func (lb *LoadTaskBl) LoadTaskStatus(taskId int32, state string, workerName string, errorMsg string) {
defer func() {
if r := recover(); r != nil {
logrus.Errorf("LoadTaskStatus panic recover taskID:%v, panic:%v, stack message: %s",
taskId, r, common.GetCurrentGoroutineStack())
}
}()
loadTask := loadGraphMgr.GetLoadTask(taskId)
if loadTask == nil || loadTask.Task.State == structure.TaskStateError || loadTask.Task.State == structure.TaskStateCanceled {
return
}
loadTask.Task.SetWorkerState(workerName, structure.TaskState(state))
if structure.TaskState(state) == structure.TaskStateError {
logrus.Infof("LoadTaskStatus task: %d, worker: %s, state: %s", taskId, workerName, state)
taskMgr.SetError(loadTask.Task, errorMsg)
//loadTask.Task.SetState(structure.TaskStateError)
//loadTask.Task.SetErrMsg(errorMsg)
if loadTask.Task.CreateType == structure.TaskCreateSync {
loadTask.Task.GetWg().Done()
}
graph := graphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
//graph.SetState(structure.GraphStateError)
graphMgr.SetError(graph)
for _, wn := range loadTask.Task.Workers {
//worker := WorkerMgr.GetWorker(wn.Name)
//if err := worker.loadServer.AsyncLoad(
if err := ServerMgr.LoadServer(wn.Name).AsyncLoad(
taskId,
pb.LoadStep_Error,
"",
loadTask.Task.GraphName,
loadTask.Task.SpaceName,
nil,
nil); err != nil {
logrus.Errorf("failed to perform the AsyncLoad through the worker with name: '%s' in the TaskStatusError state , caused by: %v", wn.Name, err)
}
}
loadTask.FreeMemory()
time.AfterFunc(1*time.Minute, func() { loadGraphMgr.DeleteTask(taskId) })
common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(loadTask.Task.Type).Dec()
if err := Scheduler.CloseCurrent(taskId); err != nil {
logrus.Errorf("failed to close task with ID: %d,err:%v", taskId, err)
}
err := graphMgr.SaveInfo(graph.SpaceName, graph.Name)
if err != nil {
logrus.Errorf("save graph info error:%v", err)
}
err = taskMgr.SaveTask(loadTask.Task.ID)
if err != nil {
logrus.Errorf("save task info error:%v", err)
}
} else if loadTask.Task.CheckTaskState(structure.TaskState(state)) {
logrus.Infof("LoadTaskStatus task: %d, worker: %s, state: %s", taskId, workerName, state)
if structure.TaskState(state) == structure.TaskStateLoadVertexOK {
logrus.Infof("load graph TaskStateLoadVertexOK task: %d, graph: %s",
taskId, loadTask.Task.GraphName)
loadTask.Task.SetState(structure.TaskStateLoadScatter)
//TaskMgr.ForceState(loadTask.Task, structure.TaskStateLoadScatter)
graph := graphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
graph.DispatchVertexId()
for _, wn := range loadTask.Task.Workers {
//worker := WorkerMgr.GetWorker(wn.Name)
//if err := worker.loadServer.AsyncLoad(
if err := ServerMgr.LoadServer(wn.Name).AsyncLoad(
taskId,
pb.LoadStep_ScatterVertex,
"",
loadTask.Task.GraphName,
loadTask.Task.SpaceName,
nil,
nil); err != nil {
logrus.Errorf("failed to perform the AsyncLoad through the worker with name: %s, caused by: %v", wn.Name, err)
}
}
} else if structure.TaskState(state) == structure.TaskStateLoadScatterOK {
logrus.Infof("load graph TaskStateLoadScatterOK task: %d, graph: %s",
taskId, loadTask.Task.GraphName)
loadTask.Task.SetState(structure.TaskStateLoadEdge)
//TaskMgr.ForceState(loadTask.Task, structure.TaskStateLoadEdge)
for _, wn := range loadTask.Task.Workers {
//worker := WorkerMgr.GetWorker(wn.Name)
//if err := worker.loadServer.AsyncLoad(
if err := ServerMgr.LoadServer(wn.Name).AsyncLoad(
taskId,
pb.LoadStep_Edge,
"",
loadTask.Task.GraphName,
loadTask.Task.SpaceName,
nil,
nil); err != nil {
logrus.Errorf("failed to perform the AsyncLoad through the worker with name: %s in the TaskStatusLoadScatterOK state, caused by: %v",
wn.Name, err)
}
}
} else if structure.TaskState(state) == structure.TaskStateLoadEdgeOK {
logrus.Infof("load graph TaskStateLoadEdgeOK task: %d, graph: %s",
taskId, loadTask.Task.GraphName)
loadTask.Task.SetState(structure.TaskStateLoadDegree)
//TaskMgr.ForceState(loadTask.Task, structure.TaskStateLoadDegree)
for _, wn := range loadTask.Task.Workers {
//worker := WorkerMgr.GetWorker(wn.Name)
//if err := worker.loadServer.AsyncLoad(
if err := ServerMgr.LoadServer(wn.Name).AsyncLoad(
taskId,
pb.LoadStep_OutDegree,
"",
loadTask.Task.GraphName,
loadTask.Task.SpaceName,
nil,
nil); err != nil {
logrus.Errorf("falied to perform the AsyncLoad through the worker with name: %s in the TaskStatusLoadEdgeOK state, caused by: %v",
workerName, err)
}
}
} else if structure.TaskState(state) == structure.TaskStateLoaded {
logrus.Infof("load graph TaskStateLoaded task: %d, graph: %s, cost: %v",
taskId, loadTask.Task.GraphName, time.Since(loadTask.Task.StartTime))
graph := graphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
//graph.SetState(structure.GraphStateLoaded)
graphMgr.ForceState(graph, structure.GraphStateLoaded)
graph.Statics()
loadTask.Task.SetState(structure.TaskStateLoaded)
//TaskMgr.ForceState(loadTask.Task, structure.TaskStateLoaded)
logrus.Infof("graph: %s, vertex: %d, edge: %d", graph.Name, graph.VertexCount, graph.EdgeCount)
for _, w := range graph.Workers {
logrus.Infof(
"graph: %s, worker: %s, vertex: %d, edge: %d",
graph.Name, w.Name, w.VertexCount, w.EdgeCount)
}
for _, wn := range loadTask.Task.Workers {
//worker := WorkerMgr.GetWorker(wn.Name)
//if err := worker.loadServer.AsyncLoad(
if err := ServerMgr.LoadServer(wn.Name).AsyncLoad(
taskId,
pb.LoadStep_Complete,
"",
loadTask.Task.GraphName,
loadTask.Task.SpaceName,
nil,
nil); err != nil {
logrus.Errorf("failed to perform the AsyncLoad through the worker with name: %s in the TaskStatusLoaded state, caused by %v",
wn.Name, err)
}
}
if loadTask.Task.CreateType == structure.TaskCreateSync {
loadTask.Task.GetWg().Done()
}
loadTask.FreeMemory()
time.AfterFunc(1*time.Minute, func() { loadGraphMgr.DeleteTask(taskId) })
common.PrometheusMetrics.GraphLoadedCnt.WithLabelValues().Inc()
common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(loadTask.Task.Type).Dec()
if err := Scheduler.CloseCurrent(taskId); err != nil {
logrus.Errorf("failed to close task with ID: %d,err:%v", taskId, err)
}
err := graphMgr.SaveInfo(graph.SpaceName, graph.Name)
if err != nil {
logrus.Errorf("save graph info error:%v", err)
}
err = taskMgr.SaveTask(loadTask.Task.ID)
if err != nil {
logrus.Errorf("save task info error:%v", err)
}
err = taskMgr.FinishTask(loadTask.Task.ID)
if err != nil {
logrus.Errorf("cancel task finished error:%v", err.Error())
}
//开始落盘
go func() {
_, ok := GraphPersistenceTask.Operate(graph.SpaceName, graph.Name, WriteDisk)
if !ok {
logrus.Errorf("graph %v write disk failed", graph.Name)
}
}()
}
}
}