vermeer/apps/master/bl/compute_task.go (271 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF
licenses this file to You under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
*/
package bl
import (
"fmt"
"time"
"vermeer/apps/common"
"vermeer/apps/compute"
. "vermeer/apps/master/graphs"
"vermeer/apps/options"
pb "vermeer/apps/protos"
"vermeer/apps/structure"
"github.com/sirupsen/logrus"
)
type ComputeTaskBl struct {
}
func (ctb *ComputeTaskBl) ComputeTaskStatus(
taskId int32, state string, workerName string, step int32, computeValues map[string][]byte, errorMsg string) {
defer func() {
if r := recover(); r != nil {
logrus.Errorf("ComputeTaskStatus panic recover taskID:%v, panic:%v,stack message: %s", taskId, r,
common.GetCurrentGoroutineStack())
}
}()
computeTask := computerTaskMgr.GetTask(taskId)
if computeTask == nil || computeTask.Task.State == structure.TaskStateError || computeTask.Task.State == structure.TaskStateCanceled {
return
}
computeTask.Task.SetWorkerState(workerName, structure.TaskState(state))
if computeTask.ComputeMaster == nil {
return
}
ctx := computeTask.ComputeMaster.Context()
for k, v := range computeValues {
cv := compute.CValue{}
_, _ = cv.Unmarshal(v)
ctx.WorkerCValues[workerName][k] = &cv
}
graph := graphMgr.GetGraphByName(computeTask.Task.SpaceName, computeTask.Task.GraphName)
if graph == nil {
logrus.Errorf("graph not exist")
return
}
logrus.Infof("ComputeTaskStatus task: %d, worker: %s, state: %s, step: %d",
taskId, workerName, state, step)
if structure.TaskState(state) == structure.TaskStateError {
logrus.Infof("ComputeTaskStatus task: %d, worker: %s, state: %s", taskId, workerName, state)
//computeTask.Task.SetState(structure.TaskStateError)
//computeTask.Task.SetErrMsg(errorMsg)
taskMgr.SetError(computeTask.Task, errorMsg)
if computeTask.Task.CreateType == structure.TaskCreateSync {
computeTask.Task.GetWg().Done()
}
//atomic.AddInt32(&graph.UsingNum, -1)
graph.SubUsingNum()
computeTask.FreeMemory()
time.AfterFunc(1*time.Minute, func() { computerTaskMgr.DeleteTask(taskId) })
common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(computeTask.Task.Type).Dec()
if computeTask.Task.CreateType == structure.TaskCreateAsync {
if err := Scheduler.CloseCurrent(taskId); err != nil {
logrus.Errorf("failed to close task with ID: %d,err:%v", taskId, err)
}
}
err := taskMgr.SaveTask(computeTask.Task.ID)
if err != nil {
logrus.Errorf("save task info error:%v", err)
}
err = taskMgr.FinishTask(taskId)
if err != nil {
logrus.Errorf("compute task finished error:%v", err.Error())
}
} else if computeTask.Task.CheckTaskState(structure.TaskState(state)) {
if structure.TaskState(state) == structure.TaskStateInitOK {
computeTask.Task.SetState(structure.TaskStateStepDoing)
for _, w := range computeTask.Task.Workers {
wc := workerMgr.GetWorker(w.Name)
//wc.SuperStepServer.AsyncSuperStep(
ServerMgr.SuperStepServer(wc.Name).AsyncSuperStep(
computeTask.Task.ID,
computeTask.Step,
false,
nil)
}
} else if structure.TaskState(state) == structure.TaskStateStepDone {
ctx.AggregateValue()
output := false
isContinue := computeTask.ComputeMaster.Compute()
computeTask.Task.SetState(structure.TaskStateStepDoing)
//TaskMgr.ForceState(computeTask.Task, structure.TaskStateStepDoing)
computeTask.Step = step
computeTask.ComputeMaster.Context().Step = step
maxStep := options.GetInt(computeTask.Task.Params, "compute.max_step")
if computeTask.Step >= int32(maxStep) || !isContinue {
output = true
logrus.Infof("compute task done, cost: %v", time.Since(computeTask.Task.StartTime))
}
computeTask.ComputeMaster.AfterStep()
computeTask.Step = step + 1
computeTask.ComputeMaster.Context().Step = step + 1
cValues := ctx.MarshalValues()
for _, w := range computeTask.Task.Workers {
wc := workerMgr.GetWorker(w.Name)
//wc.SuperStepServer.AsyncSuperStep(
ServerMgr.SuperStepServer(wc.Name).AsyncSuperStep(
computeTask.Task.ID,
computeTask.Step,
output,
cValues)
}
} else if structure.TaskState(state) == structure.TaskStateComplete {
if options.GetInt(computeTask.Task.Params, "output.need_statistics") == 1 {
ctb.computeStatistics(computeTask, graph)
}
//computeTask.Task.SetState(structure.TaskStateComplete)
logrus.Infof("compute task output complete, cost: %v", time.Since(computeTask.Task.StartTime))
if computeTask.Task.CreateType == structure.TaskCreateSync {
if algorithmMgr.GetMaker(computeTask.Algorithm).Type() == compute.AlgorithmOLTP {
ctb.computeTpResult(computeTask)
}
}
taskMgr.ForceState(computeTask.Task, structure.TaskStateComplete)
graph.SubUsingNum()
computeTask.FreeMemory()
needQuery := options.GetInt(computeTask.Task.Params, "output.need_query") == 1
if !needQuery {
time.AfterFunc(1*time.Minute, func() { computerTaskMgr.DeleteTask(taskId) })
}
common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(computeTask.Task.Type).Dec()
if computeTask.Task.CreateType == structure.TaskCreateAsync {
if err := Scheduler.CloseCurrent(taskId); err != nil {
logrus.Errorf("failed to close task with ID: %d,err:%v", taskId, err)
}
} else if computeTask.Task.CreateType == structure.TaskCreateSync {
computeTask.Task.GetWg().Done()
}
err := taskMgr.SaveTask(computeTask.Task.ID)
if err != nil {
logrus.Errorf("save task info error:%v", err)
}
err = taskMgr.FinishTask(taskId)
if err != nil {
logrus.Errorf("compute task finished error:%v", err.Error())
}
}
}
}
func (ctb *ComputeTaskBl) computeStatistics(computeTask *compute.ComputerTask, graph *structure.VermeerGraph) {
//master output result
defer func() {
computeTask.Statistics = nil
}()
mode := compute.StatisticsType(options.GetString(computeTask.Task.Params, "output.statistics_mode"))
maker := algorithmMgr.GetMaker(computeTask.Algorithm)
statistics := maker.SupportedStatistics()
if mode == "" {
for statisticsType := range statistics {
mode = statisticsType
break
}
}
if _, ok := statistics[mode]; !ok {
sKeys := make([]compute.StatisticsType, 0, len(statistics))
for statisticsType := range statistics {
sKeys = append(sKeys, statisticsType)
}
logrus.Errorf("algorithm %v not support statistics:%v. The options available are: %v ", computeTask.Algorithm, mode, sKeys)
computeTask.Task.SetErrMsg(fmt.Sprintf("algorithm %v not support statistics:%v. The options available are: %v ", computeTask.Algorithm, mode, sKeys))
}
statisticsMaster := compute.StatisticsMasterMaker(mode)
smContext := statisticsMaster.MakeContext()
smContext.Graph = graph
statisticsMaster.Init(computeTask.Task.Params)
for _, statistic := range computeTask.Statistics {
statisticsMaster.Aggregate(statistic)
}
output := statisticsMaster.Output()
for k, v := range computeTask.ComputeMaster.Statistics() {
output[k] = v
}
// write to task info
computeTask.Task.StatisticsResult = output
// outputType := options.GetString(computeTask.Task.Params, "output.type")
// writer := graphio.MakeWriter(outputType)
// writerInitInfo := graphio.WriterInitInfo{
// Params: computeTask.Task.Params,
// Mode: graphio.WriteModeStatistics,
// }
// err := writer.Init(writerInitInfo)
// if err != nil {
// //computeTask.Task.SetState(structure.TaskStateError)
// //computeTask.Task.SetErrMsg(fmt.Sprintf("write statistics init error:%v", err))
// taskMgr.SetError(computeTask.Task, fmt.Sprintf("write statistics init error:%v", err))
// }
// err = writer.WriteStatistics(output)
// if err != nil {
// //computeTask.Task.SetState(structure.TaskStateError)
// //computeTask.Task.SetErrMsg(fmt.Sprintf("write statistics error:%v", err))
// taskMgr.SetError(computeTask.Task, fmt.Sprintf("write statistics error:%v", err))
// }
// writer.Close()
}
func (ctb *ComputeTaskBl) computeTpResult(computeTask *compute.ComputerTask) {
//master output result
computeTask.TpResult.Output = computeTask.ComputeMaster.Output(computeTask.TpResult.WorkerResult)
}
func (ctb *ComputeTaskBl) Canceled(computeTask *compute.ComputerTask) {
if computeTask == nil {
logrus.Errorf("cancel computeTask is nil")
return
}
logrus.Infof("task has been canceled, task_id:%v", computeTask.Task.ID)
graph := graphMgr.GetGraphByName(computeTask.Task.SpaceName, computeTask.Task.GraphName)
if graph != nil {
//atomic.AddInt32(&graph.UsingNum, -1)
graph.SubUsingNum()
}
common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(computeTask.Task.Type).Dec()
//computeTask.Task.SetState(structure.TaskStateCanceled)
taskMgr.ForceState(computeTask.Task, structure.TaskStateCanceled)
computeTask.FreeMemory()
time.AfterFunc(1*time.Minute, func() { computerTaskMgr.DeleteTask(computeTask.Task.ID) })
}
func (ctb *ComputeTaskBl) SettingGraphStatus(
taskId int32, state string, workerName string, errorMsg string) {
defer func() {
if r := recover(); r != nil {
logrus.Errorf("ComputeTask SettingGraph panic recover taskID:%v, panic:%v, stack message: %s", taskId, r,
common.GetCurrentGoroutineStack())
}
}()
computeTask := computerTaskMgr.GetTask(taskId)
if computeTask == nil || computeTask.Task.State == structure.TaskStateError || computeTask.Task.State == structure.TaskStateCanceled {
return
}
computeTask.Task.SetWorkerState(workerName, structure.TaskState(state))
graph := graphMgr.GetGraphByName(computeTask.Task.SpaceName, computeTask.Task.GraphName)
if graph == nil {
logrus.Errorf("graph not exist")
return
}
logrus.Infof("ComputeTask SettingGraph task: %d, worker: %s, state: %s",
taskId, workerName, state)
if structure.TaskState(state) == structure.TaskStateError {
logrus.Infof("ComputeTaskStatus task: %d, worker: %s, state: %s", taskId, workerName, state)
computeTask.Task.SetState(structure.TaskStateError)
computeTask.Task.SetErrMsg(errorMsg)
if computeTask.Task.CreateType == structure.TaskCreateSync {
computeTask.Task.GetWg().Done()
}
//atomic.AddInt32(&graph.UsingNum, -1)
graph.SubUsingNum()
computeTask.FreeMemory()
common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(computeTask.Task.Type).Dec()
if computeTask.Task.CreateType == structure.TaskCreateAsync {
if err := Scheduler.CloseCurrent(taskId); err != nil {
logrus.Errorf("failed to close task with ID: %d,err:%v", taskId, err)
}
}
err := taskMgr.SaveTask(computeTask.Task.ID)
if err != nil {
logrus.Errorf("save task info error:%v", err)
}
} else if computeTask.Task.CheckTaskState(structure.TaskState(state)) {
if structure.TaskState(state) == structure.TaskStateSettingOutEdgesOK {
graph.UseOutEdges = true
computeTask.Task.SetState(structure.TaskStateSettingOutEdgesOK)
if computeTask.SettingOutDegree {
err := StartComputeTask(graph, computeTask, pb.ComputeAction_SettingOutDegree)
if err != nil {
computeTask.Task.SetState(structure.TaskStateError)
computeTask.Task.SetErrMsg(fmt.Sprintf("start compute task error:%v", err.Error()))
return
}
} else {
//开始落盘
go func() {
_, ok := GraphPersistenceTask.Operate(graph.SpaceName, graph.Name, WriteDisk)
if !ok {
logrus.Errorf("graph %v write disk failed", graph.Name)
}
}()
err := StartComputeTask(graph, computeTask, pb.ComputeAction_Compute)
if err != nil {
computeTask.Task.SetState(structure.TaskStateError)
computeTask.Task.SetErrMsg(fmt.Sprintf("start compute task error:%v", err.Error()))
return
}
}
} else if structure.TaskState(state) == structure.TaskStateSettingOutDegreeOK {
graph.UseOutDegree = true
//开始落盘
go func() {
_, ok := GraphPersistenceTask.Operate(graph.SpaceName, graph.Name, WriteDisk)
if !ok {
logrus.Errorf("graph %v write disk failed", graph.Name)
}
}()
computeTask.Task.SetState(structure.TaskStateSettingOutDegreeOK)
err := StartComputeTask(graph, computeTask, pb.ComputeAction_Compute)
if err != nil {
computeTask.Task.SetState(structure.TaskStateError)
computeTask.Task.SetErrMsg(fmt.Sprintf("start compute task error:%v", err.Error()))
return
}
}
}
}