func()

in vermeer/apps/worker/setting_bl.go [127:253]


func (sb *SettingBl) ScatterOutEdges(taskID int32, spaceName string, graphName string) {
	defer func() {
		if r := recover(); r != nil {
			sb.SetStatusError(taskID, fmt.Sprintf("ScatterOutEdges panic recover panic:%v, stack message: %s", r,
				common.GetCurrentGoroutineStack()))
			logrus.Errorf("ScatterOutEdges panic recover taskID:%v, panic:%v, stack message: %s", taskID, r,
				common.GetCurrentGoroutineStack())
		}
	}()
	logrus.Infof("start scatter out edges, task_id:%v, space:%v, graph:%v", taskID, spaceName, graphName)
	graph := GraphMgr.GetGraphByName(spaceName, graphName)
	computeTask := ComputeTaskMgr.GetTask(taskID)
	workerCount := len(graph.Workers)
	peers := make([]*PeerClient, 0, workerCount)
	for _, worker := range graph.Workers {
		peers = append(peers, PeerMgr.GetPeer(worker.Name))
	}
	if !sb.CheckAction(computeTask) {
		return
	}
	parallel := *computeTask.Parallel
	partCnt := int32(graph.Data.VertexCount)/parallel + 1
	wg := sync.WaitGroup{}
	for i := int32(0); i < parallel; i++ {
		wg.Add(1)
		go func(pID int32) {
			defer func() {
				if r := recover(); r != nil {
					sb.SetStatusError(taskID, fmt.Sprintf("ScatterOutEdges panic in goroutine recover panic:%v, stack message: %s",
						r, common.GetCurrentGoroutineStack()))
					logrus.Errorf("ScatterOutEdges panic recover in goroutine taskID:%v, pId:%v panic:%v, stack message: %s",
						taskID, pID, r, common.GetCurrentGoroutineStack())
				}
			}()
			defer wg.Done()
			sendBuffers := make([]buffer.EncodeBuffer, 0, workerCount)
			for range graph.Workers {
				buf := buffer.EncodeBuffer{}
				buf.Init(BufferSize)
				sendBuffers = append(sendBuffers, buf)
			}
			bIdx := uint32(partCnt * pID)
			eIdx := bIdx + uint32(partCnt)
			if eIdx > graph.Data.VertexCount {
				eIdx = graph.Data.VertexCount
			}
			edge := structure.IntEdge{}
			for vertID := bIdx; vertID < eIdx; vertID++ {

				edge.Target = vertID + graph.Data.VertIDStart
				inEdges := graph.Data.Edges.GetInEdges(vertID)
				for _, source := range inEdges {
					edge.Source = uint32(source)
					sendWorkerIDx := -1
					for workerIDx, worker := range graph.Workers {
						if edge.Source >= worker.VertIdStart && edge.Source < worker.VertIdStart+worker.VertexCount {
							sendWorkerIDx = workerIDx
							break
						}
					}
					_ = sendBuffers[sendWorkerIDx].Marshal(&edge)
					if sendBuffers[sendWorkerIDx].Full() {
						atomic.AddInt32(computeTask.SendCount[peers[sendWorkerIDx].Name], 1)
						if peers[sendWorkerIDx].Self {
							sb.GatherOutEdges(
								computeTask.Task.ID,
								peers[sendWorkerIDx].Name,
								false,
								atomic.LoadInt32(computeTask.SendCount[peers[sendWorkerIDx].Name]),
								sendBuffers[sendWorkerIDx].PayLoad())
						} else {
							peers[sendWorkerIDx].SettingActionHandler.SettingAction(
								computeTask.Task.ID,
								pb.SettingAction_SetOutEdges,
								false,
								atomic.LoadInt32(computeTask.SendCount[peers[sendWorkerIDx].Name]),
								sendBuffers[sendWorkerIDx].PayLoad())
						}
						sendBuffers[sendWorkerIDx].Reset()
					}
				}
			}
			for i, peer := range peers {
				atomic.AddInt32(computeTask.SendCount[peers[i].Name], 1)
				if peer.Self {
					sb.GatherOutEdges(
						computeTask.Task.ID,
						peer.Name,
						false,
						atomic.LoadInt32(computeTask.SendCount[peers[i].Name]),
						sendBuffers[i].PayLoad())
				} else {
					peer.SettingActionHandler.SettingAction(
						computeTask.Task.ID,
						pb.SettingAction_SetOutEdges,
						false,
						atomic.LoadInt32(computeTask.SendCount[peers[i].Name]),
						sendBuffers[i].PayLoad())
				}
				sendBuffers[i].Reset()
			}

		}(i)
	}
	wg.Wait()
	for i := range peers {
		atomic.AddInt32(computeTask.SendCount[peers[i].Name], 1)
		if peers[i].Self {
			sb.GatherOutEdges(
				computeTask.Task.ID,
				peers[i].Name,
				true,
				atomic.LoadInt32(computeTask.SendCount[peers[i].Name]),
				[]byte{})
		} else {
			peers[i].SettingActionHandler.SettingAction(
				computeTask.Task.ID,
				pb.SettingAction_SetOutEdges,
				true,
				atomic.LoadInt32(computeTask.SendCount[peers[i].Name]),
				[]byte{})
		}
	}
	for s := range computeTask.SendCount {
		*computeTask.SendCount[s] = 0
	}
}