in vermeer/apps/master/bl/compute_task.go [36:169]
func (ctb *ComputeTaskBl) ComputeTaskStatus(
taskId int32, state string, workerName string, step int32, computeValues map[string][]byte, errorMsg string) {
defer func() {
if r := recover(); r != nil {
logrus.Errorf("ComputeTaskStatus panic recover taskID:%v, panic:%v,stack message: %s", taskId, r,
common.GetCurrentGoroutineStack())
}
}()
computeTask := computerTaskMgr.GetTask(taskId)
if computeTask == nil || computeTask.Task.State == structure.TaskStateError || computeTask.Task.State == structure.TaskStateCanceled {
return
}
computeTask.Task.SetWorkerState(workerName, structure.TaskState(state))
if computeTask.ComputeMaster == nil {
return
}
ctx := computeTask.ComputeMaster.Context()
for k, v := range computeValues {
cv := compute.CValue{}
_, _ = cv.Unmarshal(v)
ctx.WorkerCValues[workerName][k] = &cv
}
graph := graphMgr.GetGraphByName(computeTask.Task.SpaceName, computeTask.Task.GraphName)
if graph == nil {
logrus.Errorf("graph not exist")
return
}
logrus.Infof("ComputeTaskStatus task: %d, worker: %s, state: %s, step: %d",
taskId, workerName, state, step)
if structure.TaskState(state) == structure.TaskStateError {
logrus.Infof("ComputeTaskStatus task: %d, worker: %s, state: %s", taskId, workerName, state)
//computeTask.Task.SetState(structure.TaskStateError)
//computeTask.Task.SetErrMsg(errorMsg)
taskMgr.SetError(computeTask.Task, errorMsg)
if computeTask.Task.CreateType == structure.TaskCreateSync {
computeTask.Task.GetWg().Done()
}
//atomic.AddInt32(&graph.UsingNum, -1)
graph.SubUsingNum()
computeTask.FreeMemory()
time.AfterFunc(1*time.Minute, func() { computerTaskMgr.DeleteTask(taskId) })
common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(computeTask.Task.Type).Dec()
if computeTask.Task.CreateType == structure.TaskCreateAsync {
if err := Scheduler.CloseCurrent(taskId); err != nil {
logrus.Errorf("failed to close task with ID: %d,err:%v", taskId, err)
}
}
err := taskMgr.SaveTask(computeTask.Task.ID)
if err != nil {
logrus.Errorf("save task info error:%v", err)
}
err = taskMgr.FinishTask(taskId)
if err != nil {
logrus.Errorf("compute task finished error:%v", err.Error())
}
} else if computeTask.Task.CheckTaskState(structure.TaskState(state)) {
if structure.TaskState(state) == structure.TaskStateInitOK {
computeTask.Task.SetState(structure.TaskStateStepDoing)
for _, w := range computeTask.Task.Workers {
wc := workerMgr.GetWorker(w.Name)
//wc.SuperStepServer.AsyncSuperStep(
ServerMgr.SuperStepServer(wc.Name).AsyncSuperStep(
computeTask.Task.ID,
computeTask.Step,
false,
nil)
}
} else if structure.TaskState(state) == structure.TaskStateStepDone {
ctx.AggregateValue()
output := false
isContinue := computeTask.ComputeMaster.Compute()
computeTask.Task.SetState(structure.TaskStateStepDoing)
//TaskMgr.ForceState(computeTask.Task, structure.TaskStateStepDoing)
computeTask.Step = step
computeTask.ComputeMaster.Context().Step = step
maxStep := options.GetInt(computeTask.Task.Params, "compute.max_step")
if computeTask.Step >= int32(maxStep) || !isContinue {
output = true
logrus.Infof("compute task done, cost: %v", time.Since(computeTask.Task.StartTime))
}
computeTask.ComputeMaster.AfterStep()
computeTask.Step = step + 1
computeTask.ComputeMaster.Context().Step = step + 1
cValues := ctx.MarshalValues()
for _, w := range computeTask.Task.Workers {
wc := workerMgr.GetWorker(w.Name)
//wc.SuperStepServer.AsyncSuperStep(
ServerMgr.SuperStepServer(wc.Name).AsyncSuperStep(
computeTask.Task.ID,
computeTask.Step,
output,
cValues)
}
} else if structure.TaskState(state) == structure.TaskStateComplete {
if options.GetInt(computeTask.Task.Params, "output.need_statistics") == 1 {
ctb.computeStatistics(computeTask, graph)
}
//computeTask.Task.SetState(structure.TaskStateComplete)
logrus.Infof("compute task output complete, cost: %v", time.Since(computeTask.Task.StartTime))
if computeTask.Task.CreateType == structure.TaskCreateSync {
if algorithmMgr.GetMaker(computeTask.Algorithm).Type() == compute.AlgorithmOLTP {
ctb.computeTpResult(computeTask)
}
}
taskMgr.ForceState(computeTask.Task, structure.TaskStateComplete)
graph.SubUsingNum()
computeTask.FreeMemory()
needQuery := options.GetInt(computeTask.Task.Params, "output.need_query") == 1
if !needQuery {
time.AfterFunc(1*time.Minute, func() { computerTaskMgr.DeleteTask(taskId) })
}
common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(computeTask.Task.Type).Dec()
if computeTask.Task.CreateType == structure.TaskCreateAsync {
if err := Scheduler.CloseCurrent(taskId); err != nil {
logrus.Errorf("failed to close task with ID: %d,err:%v", taskId, err)
}
} else if computeTask.Task.CreateType == structure.TaskCreateSync {
computeTask.Task.GetWg().Done()
}
err := taskMgr.SaveTask(computeTask.Task.ID)
if err != nil {
logrus.Errorf("save task info error:%v", err)
}
err = taskMgr.FinishTask(taskId)
if err != nil {
logrus.Errorf("compute task finished error:%v", err.Error())
}
}
}
}