in vermeer/apps/worker/setting_bl.go [127:253]
func (sb *SettingBl) ScatterOutEdges(taskID int32, spaceName string, graphName string) {
defer func() {
if r := recover(); r != nil {
sb.SetStatusError(taskID, fmt.Sprintf("ScatterOutEdges panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("ScatterOutEdges panic recover taskID:%v, panic:%v, stack message: %s", taskID, r,
common.GetCurrentGoroutineStack())
}
}()
logrus.Infof("start scatter out edges, task_id:%v, space:%v, graph:%v", taskID, spaceName, graphName)
graph := GraphMgr.GetGraphByName(spaceName, graphName)
computeTask := ComputeTaskMgr.GetTask(taskID)
workerCount := len(graph.Workers)
peers := make([]*PeerClient, 0, workerCount)
for _, worker := range graph.Workers {
peers = append(peers, PeerMgr.GetPeer(worker.Name))
}
if !sb.CheckAction(computeTask) {
return
}
parallel := *computeTask.Parallel
partCnt := int32(graph.Data.VertexCount)/parallel + 1
wg := sync.WaitGroup{}
for i := int32(0); i < parallel; i++ {
wg.Add(1)
go func(pID int32) {
defer func() {
if r := recover(); r != nil {
sb.SetStatusError(taskID, fmt.Sprintf("ScatterOutEdges panic in goroutine recover panic:%v, stack message: %s",
r, common.GetCurrentGoroutineStack()))
logrus.Errorf("ScatterOutEdges panic recover in goroutine taskID:%v, pId:%v panic:%v, stack message: %s",
taskID, pID, r, common.GetCurrentGoroutineStack())
}
}()
defer wg.Done()
sendBuffers := make([]buffer.EncodeBuffer, 0, workerCount)
for range graph.Workers {
buf := buffer.EncodeBuffer{}
buf.Init(BufferSize)
sendBuffers = append(sendBuffers, buf)
}
bIdx := uint32(partCnt * pID)
eIdx := bIdx + uint32(partCnt)
if eIdx > graph.Data.VertexCount {
eIdx = graph.Data.VertexCount
}
edge := structure.IntEdge{}
for vertID := bIdx; vertID < eIdx; vertID++ {
edge.Target = vertID + graph.Data.VertIDStart
inEdges := graph.Data.Edges.GetInEdges(vertID)
for _, source := range inEdges {
edge.Source = uint32(source)
sendWorkerIDx := -1
for workerIDx, worker := range graph.Workers {
if edge.Source >= worker.VertIdStart && edge.Source < worker.VertIdStart+worker.VertexCount {
sendWorkerIDx = workerIDx
break
}
}
_ = sendBuffers[sendWorkerIDx].Marshal(&edge)
if sendBuffers[sendWorkerIDx].Full() {
atomic.AddInt32(computeTask.SendCount[peers[sendWorkerIDx].Name], 1)
if peers[sendWorkerIDx].Self {
sb.GatherOutEdges(
computeTask.Task.ID,
peers[sendWorkerIDx].Name,
false,
atomic.LoadInt32(computeTask.SendCount[peers[sendWorkerIDx].Name]),
sendBuffers[sendWorkerIDx].PayLoad())
} else {
peers[sendWorkerIDx].SettingActionHandler.SettingAction(
computeTask.Task.ID,
pb.SettingAction_SetOutEdges,
false,
atomic.LoadInt32(computeTask.SendCount[peers[sendWorkerIDx].Name]),
sendBuffers[sendWorkerIDx].PayLoad())
}
sendBuffers[sendWorkerIDx].Reset()
}
}
}
for i, peer := range peers {
atomic.AddInt32(computeTask.SendCount[peers[i].Name], 1)
if peer.Self {
sb.GatherOutEdges(
computeTask.Task.ID,
peer.Name,
false,
atomic.LoadInt32(computeTask.SendCount[peers[i].Name]),
sendBuffers[i].PayLoad())
} else {
peer.SettingActionHandler.SettingAction(
computeTask.Task.ID,
pb.SettingAction_SetOutEdges,
false,
atomic.LoadInt32(computeTask.SendCount[peers[i].Name]),
sendBuffers[i].PayLoad())
}
sendBuffers[i].Reset()
}
}(i)
}
wg.Wait()
for i := range peers {
atomic.AddInt32(computeTask.SendCount[peers[i].Name], 1)
if peers[i].Self {
sb.GatherOutEdges(
computeTask.Task.ID,
peers[i].Name,
true,
atomic.LoadInt32(computeTask.SendCount[peers[i].Name]),
[]byte{})
} else {
peers[i].SettingActionHandler.SettingAction(
computeTask.Task.ID,
pb.SettingAction_SetOutEdges,
true,
atomic.LoadInt32(computeTask.SendCount[peers[i].Name]),
[]byte{})
}
}
for s := range computeTask.SendCount {
*computeTask.SendCount[s] = 0
}
}