func()

in vermeer/apps/worker/load_graph_bl.go [179:369]


func (lb *LoadGraphBl) ScatterVertex(taskID int32) {
	defer func() {
		if r := recover(); r != nil {
			lb.SetStatusError(taskID, fmt.Sprintf("ScatterVertex panic recover panic:%v, stack message: %s", r,
				common.GetCurrentGoroutineStack()))
			logrus.Errorf("ScatterVertex panic recover taskID:%v, panic:%v, stack message: %s", taskID, r,
				common.GetCurrentGoroutineStack())
		}
	}()
	loadTask := LoadGraphMgr.GetLoadTask(taskID)
	if !lb.CheckAction(loadTask) {
		return
	}
	//defer lb.CheckAction(loadTask)
	graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)

	ctx := context.Background()
	req := pb.GetGraphWorkersReq{
		GraphName: loadTask.Task.GraphName,
		SpaceName: loadTask.Task.SpaceName,
	}
	resp, err := ServiceWorker.MasterClient.GetGraphWorkers(ctx, &req)
	if err != nil {
		lb.SetStatusError(taskID, fmt.Sprintf("GetGraphWorkers error: %s", err))
		logrus.Errorf("GetGraphWorkers error: %s", err)
		return
	}

	for _, w := range resp.Workers {
		graph.SetWorkerVertexCount(w.Name, w.VertexCount, w.VertIdStart)
	}

	workerCount := len(loadTask.Task.Workers)

	graph.RecastVertex()

	loadTask.LoadWg.Done()
	peers := make([]*PeerClient, 0, workerCount-1)
	for _, wn := range loadTask.Task.Workers {
		if wn.Name == ServiceWorker.WorkerName {
			lb.GatherVertex(
				loadTask.Task.ID,
				wn.Name,
				true,
				1,
				[]byte{})
			continue
		}
		peers = append(peers, PeerMgr.GetPeer(wn.Name))
	}

	// skip if only on worker
	if len(peers) == 0 {
		return
	}

	// sendBuffer := buffer.EncodeBuffer{}
	// sendBuffer.Init(BufferSize)

	localGw := graph.GetGraphWorker(ServiceWorker.WorkerName)
	parallel := options.GetInt(loadTask.Task.Params, "load.parallel")
	if parallel <= 0 {
		logrus.Infof("load.parallel value must be larger than 0, get: %v, set to defalut value :1", parallel)
		parallel = 1
	} else if parallel > 10 {
		parallel = 10
	}

	sendBuffers := make([]buffer.EncodeBuffer, parallel)
	for i := range sendBuffers {
		sendBuffers[i] = buffer.EncodeBuffer{}
		sendBuffers[i].Init(BufferSize)
	}

	partCnt := int(localGw.VertexCount)/parallel + 1
	wg := &sync.WaitGroup{}
	for i := 0; i < parallel; i++ {
		wg.Add(1)
		go func(pID int) {
			defer func() {
				if r := recover(); r != nil {
					lb.SetStatusError(taskID, fmt.Sprintf("ScatterVertex panic recover panic:%v, stack message: %s",
						r, common.GetCurrentGoroutineStack()))
					logrus.Errorf("ScatterVertex panic recover taskID:%v, pId:%v panic:%v, stack message: %s",
						taskID, pID, r, common.GetCurrentGoroutineStack())
				}
			}()
			defer wg.Done()
			bIdx := uint32(partCnt*pID) + localGw.VertIdStart
			eIdx := bIdx + uint32(partCnt)
			if eIdx > localGw.VertIdStart+localGw.VertexCount {
				eIdx = localGw.VertIdStart + localGw.VertexCount
			}
			vOffSet := serialize.SUint32(bIdx)
			_ = sendBuffers[pID].Marshal(&vOffSet)
			for j := bIdx; j < eIdx; j++ {
				vertex := graph.Data.Vertex.GetVertex(j)
				_ = sendBuffers[pID].Marshal(&vertex)
				if graph.UseProperty {
					for _, k := range graph.Data.VertexPropertySchema.Schema {
						value := graph.Data.VertexProperty.GetValue(k.PropKey, j)
						_ = sendBuffers[pID].Marshal(value)
					}
				}
				if sendBuffers[pID].Full() {
					count := int32(sendBuffers[pID].ObjCount())
					if graph.UseProperty {
						count = int32(sendBuffers[pID].ObjCount() / (len(graph.Data.VertexPropertySchema.Schema) + 1))
					}
					for _, peer := range peers {
						atomic.AddInt32(loadTask.SendCount[peer.Name], 1)
						peer.LoadActionHandler.LoadAction(
							loadTask.Task.ID,
							pb.LoadAction_LoadScatter,
							count,
							false,
							0,
							sendBuffers[pID].PayLoad())
					}
					sendBuffers[pID].Reset()
					vOffSet = serialize.SUint32(j + 1)
					_ = sendBuffers[pID].Marshal(&vOffSet)
				}
			}
			count := int32(sendBuffers[pID].ObjCount())
			if graph.UseProperty {
				count = int32(sendBuffers[pID].ObjCount() / (len(graph.Data.VertexPropertySchema.Schema) + 1))
			}
			for _, peer := range peers {
				atomic.AddInt32(loadTask.SendCount[peer.Name], 1)
				peer.LoadActionHandler.LoadAction(
					loadTask.Task.ID,
					pb.LoadAction_LoadScatter,
					count,
					false,
					0,
					sendBuffers[pID].PayLoad())
			}
			sendBuffers[pID].Reset()
		}(i)
	}
	wg.Wait()
	// vOffSet := serialize.SUint32(localGw.VertIdStart)
	// _ = sendBuffer.Marshal(&vOffSet)
	// for i := localGw.VertIdStart; i < localGw.VertIdStart+localGw.VertexCount; i++ {
	// 	vertex := graph.Data.Vertex.GetVertex(i)
	// 	_ = sendBuffer.Marshal(&vertex)
	// 	if graph.UseProperty {
	// 		for _, k := range graph.Data.VertexPropertySchema.Schema {
	// 			value := graph.Data.VertexProperty.GetValue(k.PropKey, i)
	// 			_ = sendBuffer.Marshal(value)
	// 		}
	// 	}
	// 	if sendBuffer.Full() {
	// 		count := int32(sendBuffer.ObjCount())
	// 		if graph.UseProperty {
	// 			count = int32(sendBuffer.ObjCount() / (len(graph.Data.VertexPropertySchema.Schema) + 1))
	// 		}
	// 		for _, peer := range peers {
	// 			peer.LoadActionHandler.LoadAction(
	// 				loadTask.Task.ID,
	// 				pb.LoadAction_LoadScatter,
	// 				count,
	// 				false,
	// 				0,
	// 				sendBuffer.PayLoad())
	// 		}
	// 		sendBuffer.Reset()
	// 		vOffSet = serialize.SUint32(i + 1)
	// 		_ = sendBuffer.Marshal(&vOffSet)
	// 	}
	// }
	// count := int32(sendBuffer.ObjCount())
	// if graph.UseProperty {
	// 	count = int32(sendBuffer.ObjCount() / (len(graph.Data.VertexPropertySchema.Schema) + 1))
	// }
	for _, peer := range peers {
		atomic.AddInt32(loadTask.SendCount[peer.Name], 1)
		peer.LoadActionHandler.LoadAction(
			loadTask.Task.ID,
			pb.LoadAction_LoadScatter,
			0,
			true,
			atomic.LoadInt32(loadTask.SendCount[peer.Name]),
			[]byte{})
	}
	// sendBuffer.Reset()
	for s := range loadTask.SendCount {
		*loadTask.SendCount[s] = 0
	}
}