in vermeer/apps/worker/setting_bl.go [337:459]
func (sb *SettingBl) ScatterOutDegree(taskID int32, spaceName string, graphName string) {
defer func() {
if r := recover(); r != nil {
sb.SetStatusError(taskID, fmt.Sprintf("ScatterOutDegree panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("ScatterOutDegree panic recover taskID:%v, panic:%v, stack message: %s", taskID, r,
common.GetCurrentGoroutineStack())
}
}()
logrus.Infof("start scatter out degree, task_id:%v, space:%v, graph:%v", taskID, spaceName, graphName)
graph := GraphMgr.GetGraphByName(spaceName, graphName)
workerCount := len(graph.Workers)
peers := make([]*PeerClient, 0, workerCount)
for _, worker := range graph.Workers {
peers = append(peers, PeerMgr.GetPeer(worker.Name))
}
outDegree := make([]serialize.SUint32, graph.Data.Vertex.TotalVertexCount())
for i := uint32(0); i < graph.Data.VertexCount; i++ {
inEdges := graph.Data.Edges.GetInEdges(i)
for _, source := range inEdges {
outDegree[source]++
}
}
computeTask := ComputeTaskMgr.GetTask(taskID)
parallel := *computeTask.Parallel
partCnt := int32(graph.Data.Vertex.TotalVertexCount())/parallel + 1
wg := sync.WaitGroup{}
for i := int32(0); i < parallel; i++ {
wg.Add(1)
go func(pID int32) {
defer func() {
if r := recover(); r != nil {
sb.SetStatusError(taskID, fmt.Sprintf("ScatterOutEdges panic in goroutine recover panic:%v, stack message: %s",
r, common.GetCurrentGoroutineStack()))
logrus.Errorf("ScatterOutEdges panic recover in goroutine taskID:%v, pId:%v panic:%v, stack message: %s",
taskID, pID, r, common.GetCurrentGoroutineStack())
}
}()
defer wg.Done()
sendBuffer := buffer.EncodeBuffer{}
sendBuffer.Init(BufferSize)
bIdx := uint32(partCnt * pID)
eIdx := bIdx + uint32(partCnt)
if eIdx > graph.Data.Vertex.TotalVertexCount() {
eIdx = graph.Data.Vertex.TotalVertexCount()
}
vOffset := serialize.SUint32(bIdx)
if len(peers) > 0 {
_ = sendBuffer.Marshal(&vOffset)
for i := bIdx; i < eIdx; i++ {
outDegree := outDegree[i]
//logrus.Debugf("vertex:%v outDegree:%v ", graph.Data.Vertex.GetVertex(i).Id, outDegree)
_ = sendBuffer.Marshal(&outDegree)
if sendBuffer.Full() {
for _, peer := range peers {
atomic.AddInt32(computeTask.SendCount[peer.Name], 1)
if peer.Self {
sb.GatherOutDegree(
taskID,
peer.Name,
false,
0,
sendBuffer.PayLoad())
} else {
peer.SettingActionHandler.SettingAction(
computeTask.Task.ID,
pb.SettingAction_SetOutDegree,
false,
0,
sendBuffer.PayLoad())
}
}
sendBuffer.Reset()
vOffset = serialize.SUint32(i + 1)
_ = sendBuffer.Marshal(&vOffset)
}
}
for _, peer := range peers {
atomic.AddInt32(computeTask.SendCount[peer.Name], 1)
if peer.Self {
sb.GatherOutDegree(
taskID,
peer.Name,
false,
0,
sendBuffer.PayLoad())
} else {
peer.SettingActionHandler.SettingAction(
computeTask.Task.ID,
pb.SettingAction_SetOutDegree,
false,
0,
sendBuffer.PayLoad())
}
}
sendBuffer.Reset()
}
}(i)
}
wg.Wait()
for _, peer := range peers {
atomic.AddInt32(computeTask.SendCount[peer.Name], 1)
if peer.Self {
sb.GatherOutDegree(
taskID,
peer.Name,
true,
atomic.LoadInt32(computeTask.SendCount[peer.Name]),
[]byte{})
} else {
peer.SettingActionHandler.SettingAction(
computeTask.Task.ID,
pb.SettingAction_SetOutDegree,
true,
atomic.LoadInt32(computeTask.SendCount[peer.Name]),
[]byte{})
}
}
}