in vermeer/apps/worker/setting_bl.go [461:545]
func (sb *SettingBl) GatherOutDegree(taskID int32, workerName string, end bool, endNum int32, data []byte) {
defer func() {
if r := recover(); r != nil {
sb.SetStatusError(taskID, fmt.Sprintf("GatherOutEdges panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("GatherOutEdges panic recover taskID:%v, panic:%v, stack message: %s", taskID, r,
common.GetCurrentGoroutineStack())
}
}()
logrus.Debugf("gather out degree worker:%v, end;%v, endnum:%v", workerName, end, endNum)
computeTask := ComputeTaskMgr.GetTask(taskID)
for i := 0; i < 100 && computeTask == nil; i++ {
//wait 100ms if computeTask not init.
logrus.Warnf("GatherOutEdges task id:%v is not available, wait 100ms", taskID)
time.Sleep(100 * time.Millisecond)
computeTask = ComputeTaskMgr.GetTask(taskID)
}
if !sb.CheckAction(computeTask) {
return
}
//defer sb.CheckAction(computeTask)
computeTask.StepWg.Wait()
computeTask.RecvWg.Add(1)
i := 0
vOffSet := serialize.SUint32(0)
if len(data) >= 4 {
n, _ := vOffSet.Unmarshal(data)
i += n
var outDegree serialize.SUint32
graph := GraphMgr.GetGraphByName(computeTask.Task.SpaceName, computeTask.Task.GraphName)
for i < len(data) {
n, err := outDegree.Unmarshal(data[i:])
//n, err := graph.Data.OutDegree[int(vOffSet)].Unmarshal(data[i:])
if err != nil {
sb.SetStatusError(taskID, fmt.Sprintf("setting graph gather outdegree error: %sb", err))
logrus.Errorf("setting graph gather outdegree error: %sb", err)
break
}
graph.Data.Edges.AddOutDegree(uint32(vOffSet), uint32(outDegree))
i += n
vOffSet += 1
}
}
computeTask.RecvWg.Done()
atomic.AddInt32(computeTask.RecvCount[workerName], 1)
if end {
// wait for all messages are processed
computeTask.RecvWg.Wait()
for i := 0; i < 100; i++ {
if atomic.LoadInt32(computeTask.RecvCount[workerName]) >= endNum {
break
}
logrus.Warnf("There are still buffer left to be processed. From worker:%v", workerName)
logrus.Debugf("recv count:%v ,end num:%v ", *computeTask.RecvCount[workerName], endNum)
time.Sleep(100 * time.Millisecond)
}
var allWorkerComplete bool
computeTask.Locker.Lock()
state := structure.TaskStateSettingOutDegreeOK
computeTask.Task.SetWorkerState(workerName, state)
allWorkerComplete = computeTask.Task.CheckTaskState(state)
computeTask.Locker.Unlock()
if allWorkerComplete {
//loadTask.Task.SetWorkerState(workerName, structure.TaskStateLoaded)
//if loadTask.Task.CheckTaskState(structure.TaskStateLoaded) {
computeTask.Task.SetState(state)
req := pb.SettingGraphReq{
WorkerName: ServiceWorker.WorkerName,
TaskId: taskID,
State: string(state),
}
ctx := context.Background()
_, err := ServiceWorker.MasterClient.SettingGraph(ctx, &req)
if err != nil {
logrus.Errorf("LoadTaskStatus error: %sb", err)
}
}
}
}