func()

in vermeer/apps/master/bl/compute_task.go [36:169]


func (ctb *ComputeTaskBl) ComputeTaskStatus(
	taskId int32, state string, workerName string, step int32, computeValues map[string][]byte, errorMsg string) {
	defer func() {
		if r := recover(); r != nil {
			logrus.Errorf("ComputeTaskStatus panic recover taskID:%v, panic:%v,stack message: %s", taskId, r,
				common.GetCurrentGoroutineStack())
		}
	}()
	computeTask := computerTaskMgr.GetTask(taskId)
	if computeTask == nil || computeTask.Task.State == structure.TaskStateError || computeTask.Task.State == structure.TaskStateCanceled {
		return
	}
	computeTask.Task.SetWorkerState(workerName, structure.TaskState(state))
	if computeTask.ComputeMaster == nil {
		return
	}
	ctx := computeTask.ComputeMaster.Context()
	for k, v := range computeValues {
		cv := compute.CValue{}
		_, _ = cv.Unmarshal(v)
		ctx.WorkerCValues[workerName][k] = &cv
	}
	graph := graphMgr.GetGraphByName(computeTask.Task.SpaceName, computeTask.Task.GraphName)
	if graph == nil {
		logrus.Errorf("graph not exist")
		return
	}
	logrus.Infof("ComputeTaskStatus task: %d, worker: %s, state: %s, step: %d",
		taskId, workerName, state, step)
	if structure.TaskState(state) == structure.TaskStateError {
		logrus.Infof("ComputeTaskStatus task: %d, worker: %s, state: %s", taskId, workerName, state)

		//computeTask.Task.SetState(structure.TaskStateError)
		//computeTask.Task.SetErrMsg(errorMsg)
		taskMgr.SetError(computeTask.Task, errorMsg)

		if computeTask.Task.CreateType == structure.TaskCreateSync {
			computeTask.Task.GetWg().Done()
		}
		//atomic.AddInt32(&graph.UsingNum, -1)
		graph.SubUsingNum()
		computeTask.FreeMemory()
		time.AfterFunc(1*time.Minute, func() { computerTaskMgr.DeleteTask(taskId) })
		common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(computeTask.Task.Type).Dec()
		if computeTask.Task.CreateType == structure.TaskCreateAsync {
			if err := Scheduler.CloseCurrent(taskId); err != nil {
				logrus.Errorf("failed to close task with ID: %d,err:%v", taskId, err)
			}
		}
		err := taskMgr.SaveTask(computeTask.Task.ID)
		if err != nil {
			logrus.Errorf("save task info error:%v", err)
		}
		err = taskMgr.FinishTask(taskId)
		if err != nil {
			logrus.Errorf("compute task finished error:%v", err.Error())
		}
	} else if computeTask.Task.CheckTaskState(structure.TaskState(state)) {
		if structure.TaskState(state) == structure.TaskStateInitOK {
			computeTask.Task.SetState(structure.TaskStateStepDoing)
			for _, w := range computeTask.Task.Workers {
				wc := workerMgr.GetWorker(w.Name)
				//wc.SuperStepServer.AsyncSuperStep(
				ServerMgr.SuperStepServer(wc.Name).AsyncSuperStep(
					computeTask.Task.ID,
					computeTask.Step,
					false,
					nil)
			}
		} else if structure.TaskState(state) == structure.TaskStateStepDone {
			ctx.AggregateValue()
			output := false
			isContinue := computeTask.ComputeMaster.Compute()
			computeTask.Task.SetState(structure.TaskStateStepDoing)
			//TaskMgr.ForceState(computeTask.Task, structure.TaskStateStepDoing)
			computeTask.Step = step
			computeTask.ComputeMaster.Context().Step = step
			maxStep := options.GetInt(computeTask.Task.Params, "compute.max_step")
			if computeTask.Step >= int32(maxStep) || !isContinue {
				output = true
				logrus.Infof("compute task done, cost: %v", time.Since(computeTask.Task.StartTime))
			}
			computeTask.ComputeMaster.AfterStep()
			computeTask.Step = step + 1
			computeTask.ComputeMaster.Context().Step = step + 1
			cValues := ctx.MarshalValues()
			for _, w := range computeTask.Task.Workers {
				wc := workerMgr.GetWorker(w.Name)
				//wc.SuperStepServer.AsyncSuperStep(
				ServerMgr.SuperStepServer(wc.Name).AsyncSuperStep(
					computeTask.Task.ID,
					computeTask.Step,
					output,
					cValues)
			}
		} else if structure.TaskState(state) == structure.TaskStateComplete {

			if options.GetInt(computeTask.Task.Params, "output.need_statistics") == 1 {
				ctb.computeStatistics(computeTask, graph)
			}
			//computeTask.Task.SetState(structure.TaskStateComplete)

			logrus.Infof("compute task output complete, cost: %v", time.Since(computeTask.Task.StartTime))
			if computeTask.Task.CreateType == structure.TaskCreateSync {
				if algorithmMgr.GetMaker(computeTask.Algorithm).Type() == compute.AlgorithmOLTP {
					ctb.computeTpResult(computeTask)
				}
			}
			taskMgr.ForceState(computeTask.Task, structure.TaskStateComplete)
			graph.SubUsingNum()
			computeTask.FreeMemory()
			needQuery := options.GetInt(computeTask.Task.Params, "output.need_query") == 1
			if !needQuery {
				time.AfterFunc(1*time.Minute, func() { computerTaskMgr.DeleteTask(taskId) })
			}
			common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(computeTask.Task.Type).Dec()
			if computeTask.Task.CreateType == structure.TaskCreateAsync {
				if err := Scheduler.CloseCurrent(taskId); err != nil {
					logrus.Errorf("failed to close task with ID: %d,err:%v", taskId, err)
				}
			} else if computeTask.Task.CreateType == structure.TaskCreateSync {
				computeTask.Task.GetWg().Done()
			}
			err := taskMgr.SaveTask(computeTask.Task.ID)
			if err != nil {
				logrus.Errorf("save task info error:%v", err)
			}
			err = taskMgr.FinishTask(taskId)
			if err != nil {
				logrus.Errorf("compute task finished error:%v", err.Error())
			}
		}
	}
}