func()

in vermeer/apps/master/bl/task_starter.go [186:273]


func (cts *computeTaskStarter) StartTask() error {
	task := cts.task

	accessMgr.Access(task.SpaceName, task.GraphName)

	if err := cts.preStartTask(); err != nil {
		logrus.Errorf("failed to call `preStartTask` of `computeTask`, caused by: %v", err)
	}

	graph := graphMgr.GetGraphByName(task.SpaceName, task.GraphName)
	if graph == nil {
		return fmt.Errorf("failed to retrieve graph with name: %s/%s", task.SpaceName, task.GraphName)
	}

	if len(graph.Workers) == 0 {
		//graph.SetState(structure.GraphStateError)
		graphMgr.SetError(graph)
		return fmt.Errorf("there are no GraphWorkers in this graph: %s/%s", task.SpaceName, task.GraphName)
	}

	if err := tasks.CheckAlgoComputable(graph, task.Params); err != nil {
		return err
	}

	// It'll load graph data from disk if the graph status is OnDisk.
	if graph.State == structure.GraphStateOnDisk {
		graph.State = structure.GraphStateLoading
		_, success := GraphPersistenceTask.Operate(task.SpaceName, task.GraphName, Read)
		if !success {
			//graph.SetState(structure.GraphStateError)
			graphMgr.SetError(graph)
			return fmt.Errorf("graph load from disk error %s/%s: %s", task.SpaceName, task.GraphName, graph.State)
		} else {
			graph.State = structure.GraphStateLoaded
		}
	}

	ct := computerTaskMgr.MakeTask(task)
	maker := algorithmMgr.GetMaker(ct.Algorithm)
	if maker == nil {
		return fmt.Errorf("algorithm not exists: %s", ct.Algorithm)
	}
	dataNeeded := maker.DataNeeded()
	var useOutDegree bool
	var useOutEdges bool
	for _, need := range dataNeeded {
		switch need {
		case compute.UseOutDegree:
			useOutDegree = true
		case compute.UseOutEdge:
			useOutEdges = true
		}
	}

	var action = pb.ComputeAction_Compute
	var settingOutEdges bool
	var settingOutDegree bool
	if !graph.UseOutDegree && useOutDegree {
		settingOutDegree = true
		action = pb.ComputeAction_SettingOutDegree
	}
	if !graph.UseOutEdges && useOutEdges {
		settingOutEdges = true
		action = pb.ComputeAction_SettingOutEdges
	}
	if settingOutEdges || settingOutDegree {
		ct.SettingOutEdges = settingOutEdges
		ct.SettingOutDegree = settingOutDegree
	}

	for _, w := range graph.Workers {
		taskWorker := structure.TaskWorker{
			Name: w.Name,
		}
		task.Workers = append(task.Workers, &taskWorker)
	}

	err := StartComputeTask(graph, ct, action)
	if err != nil {
		task.SetState(structure.TaskStateError)
		task.SetErrMsg(fmt.Sprintf("start compute task error:%v", err.Error()))
		return err
	}

	//atomic.AddInt32(&graph.UsingNum, 1)
	graph.AddUsingNum()
	return nil
}