in vermeer/apps/master/bl/task_starter.go [186:273]
func (cts *computeTaskStarter) StartTask() error {
task := cts.task
accessMgr.Access(task.SpaceName, task.GraphName)
if err := cts.preStartTask(); err != nil {
logrus.Errorf("failed to call `preStartTask` of `computeTask`, caused by: %v", err)
}
graph := graphMgr.GetGraphByName(task.SpaceName, task.GraphName)
if graph == nil {
return fmt.Errorf("failed to retrieve graph with name: %s/%s", task.SpaceName, task.GraphName)
}
if len(graph.Workers) == 0 {
//graph.SetState(structure.GraphStateError)
graphMgr.SetError(graph)
return fmt.Errorf("there are no GraphWorkers in this graph: %s/%s", task.SpaceName, task.GraphName)
}
if err := tasks.CheckAlgoComputable(graph, task.Params); err != nil {
return err
}
// It'll load graph data from disk if the graph status is OnDisk.
if graph.State == structure.GraphStateOnDisk {
graph.State = structure.GraphStateLoading
_, success := GraphPersistenceTask.Operate(task.SpaceName, task.GraphName, Read)
if !success {
//graph.SetState(structure.GraphStateError)
graphMgr.SetError(graph)
return fmt.Errorf("graph load from disk error %s/%s: %s", task.SpaceName, task.GraphName, graph.State)
} else {
graph.State = structure.GraphStateLoaded
}
}
ct := computerTaskMgr.MakeTask(task)
maker := algorithmMgr.GetMaker(ct.Algorithm)
if maker == nil {
return fmt.Errorf("algorithm not exists: %s", ct.Algorithm)
}
dataNeeded := maker.DataNeeded()
var useOutDegree bool
var useOutEdges bool
for _, need := range dataNeeded {
switch need {
case compute.UseOutDegree:
useOutDegree = true
case compute.UseOutEdge:
useOutEdges = true
}
}
var action = pb.ComputeAction_Compute
var settingOutEdges bool
var settingOutDegree bool
if !graph.UseOutDegree && useOutDegree {
settingOutDegree = true
action = pb.ComputeAction_SettingOutDegree
}
if !graph.UseOutEdges && useOutEdges {
settingOutEdges = true
action = pb.ComputeAction_SettingOutEdges
}
if settingOutEdges || settingOutDegree {
ct.SettingOutEdges = settingOutEdges
ct.SettingOutDegree = settingOutDegree
}
for _, w := range graph.Workers {
taskWorker := structure.TaskWorker{
Name: w.Name,
}
task.Workers = append(task.Workers, &taskWorker)
}
err := StartComputeTask(graph, ct, action)
if err != nil {
task.SetState(structure.TaskStateError)
task.SetErrMsg(fmt.Sprintf("start compute task error:%v", err.Error()))
return err
}
//atomic.AddInt32(&graph.UsingNum, 1)
graph.AddUsingNum()
return nil
}