func()

in vermeer/apps/worker/setting_bl.go [337:459]


func (sb *SettingBl) ScatterOutDegree(taskID int32, spaceName string, graphName string) {
	defer func() {
		if r := recover(); r != nil {
			sb.SetStatusError(taskID, fmt.Sprintf("ScatterOutDegree panic recover panic:%v, stack message: %s", r,
				common.GetCurrentGoroutineStack()))
			logrus.Errorf("ScatterOutDegree panic recover taskID:%v, panic:%v, stack message: %s", taskID, r,
				common.GetCurrentGoroutineStack())
		}
	}()
	logrus.Infof("start scatter out degree, task_id:%v, space:%v, graph:%v", taskID, spaceName, graphName)

	graph := GraphMgr.GetGraphByName(spaceName, graphName)

	workerCount := len(graph.Workers)
	peers := make([]*PeerClient, 0, workerCount)
	for _, worker := range graph.Workers {
		peers = append(peers, PeerMgr.GetPeer(worker.Name))
	}

	outDegree := make([]serialize.SUint32, graph.Data.Vertex.TotalVertexCount())
	for i := uint32(0); i < graph.Data.VertexCount; i++ {
		inEdges := graph.Data.Edges.GetInEdges(i)
		for _, source := range inEdges {
			outDegree[source]++
		}
	}

	computeTask := ComputeTaskMgr.GetTask(taskID)
	parallel := *computeTask.Parallel
	partCnt := int32(graph.Data.Vertex.TotalVertexCount())/parallel + 1
	wg := sync.WaitGroup{}
	for i := int32(0); i < parallel; i++ {
		wg.Add(1)
		go func(pID int32) {
			defer func() {
				if r := recover(); r != nil {
					sb.SetStatusError(taskID, fmt.Sprintf("ScatterOutEdges panic in goroutine recover panic:%v, stack message: %s",
						r, common.GetCurrentGoroutineStack()))
					logrus.Errorf("ScatterOutEdges panic recover in goroutine taskID:%v, pId:%v panic:%v, stack message: %s",
						taskID, pID, r, common.GetCurrentGoroutineStack())
				}
			}()
			defer wg.Done()
			sendBuffer := buffer.EncodeBuffer{}
			sendBuffer.Init(BufferSize)
			bIdx := uint32(partCnt * pID)
			eIdx := bIdx + uint32(partCnt)
			if eIdx > graph.Data.Vertex.TotalVertexCount() {
				eIdx = graph.Data.Vertex.TotalVertexCount()
			}
			vOffset := serialize.SUint32(bIdx)
			if len(peers) > 0 {
				_ = sendBuffer.Marshal(&vOffset)
				for i := bIdx; i < eIdx; i++ {
					outDegree := outDegree[i]
					//logrus.Debugf("vertex:%v outDegree:%v ", graph.Data.Vertex.GetVertex(i).Id, outDegree)
					_ = sendBuffer.Marshal(&outDegree)
					if sendBuffer.Full() {
						for _, peer := range peers {
							atomic.AddInt32(computeTask.SendCount[peer.Name], 1)
							if peer.Self {
								sb.GatherOutDegree(
									taskID,
									peer.Name,
									false,
									0,
									sendBuffer.PayLoad())
							} else {
								peer.SettingActionHandler.SettingAction(
									computeTask.Task.ID,
									pb.SettingAction_SetOutDegree,
									false,
									0,
									sendBuffer.PayLoad())
							}
						}
						sendBuffer.Reset()
						vOffset = serialize.SUint32(i + 1)
						_ = sendBuffer.Marshal(&vOffset)
					}
				}
				for _, peer := range peers {
					atomic.AddInt32(computeTask.SendCount[peer.Name], 1)
					if peer.Self {
						sb.GatherOutDegree(
							taskID,
							peer.Name,
							false,
							0,
							sendBuffer.PayLoad())
					} else {
						peer.SettingActionHandler.SettingAction(
							computeTask.Task.ID,
							pb.SettingAction_SetOutDegree,
							false,
							0,
							sendBuffer.PayLoad())
					}
				}
				sendBuffer.Reset()
			}
		}(i)
	}
	wg.Wait()
	for _, peer := range peers {
		atomic.AddInt32(computeTask.SendCount[peer.Name], 1)
		if peer.Self {
			sb.GatherOutDegree(
				taskID,
				peer.Name,
				true,
				atomic.LoadInt32(computeTask.SendCount[peer.Name]),
				[]byte{})
		} else {
			peer.SettingActionHandler.SettingAction(
				computeTask.Task.ID,
				pb.SettingAction_SetOutDegree,
				true,
				atomic.LoadInt32(computeTask.SendCount[peer.Name]),
				[]byte{})
		}
	}
}