func()

in vermeer/apps/worker/load_graph_bl.go [521:660]


func (lb *LoadGraphBl) RunLoadVertex(ctx context.Context, loadTask *graphio.LoadGraphTask) {
	defer func() {
		if r := recover(); r != nil {
			lb.SetStatusError(loadTask.Task.ID, fmt.Sprintf("RunLoadVertex panic recover panic:%v, stack message: %s",
				r, common.GetCurrentGoroutineStack()))
			logrus.Errorf("RunLoadVertex panic recover taskID:%v, panic:%v, stack message: %s",
				loadTask.Task.ID, r, common.GetCurrentGoroutineStack())
		}
	}()
	if !lb.CheckAction(loadTask) {
		return
	}
	//defer lb.CheckAction(loadTask)
	workerCount := len(loadTask.Task.Workers)

	sendBuffers := make([]buffer.EncodeBuffer, 0, workerCount)
	peers := make([]*PeerClient, 0, workerCount)
	graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
	for _, v := range loadTask.Task.Workers {
		peers = append(peers, PeerMgr.GetPeer(v.Name))
		buf := buffer.EncodeBuffer{}
		buf.Init(BufferSize)
		sendBuffers = append(sendBuffers, buf)
	}

	vertex := structure.Vertex{}
	property := structure.PropertyValue{}
	property.Init(graph.Data.VertexPropertySchema)
	for {
		if !lb.CheckAction(loadTask) {
			return
		}
		reqCtx := context.Background()
		req := pb.FetchLoadPartReq{}
		req.TaskId = loadTask.Task.ID
		req.WorkerName = ServiceWorker.WorkerName
		resp, err := ServiceWorker.MasterClient.FetchLoadPart(reqCtx, &req)
		if err != nil {
			logrus.Errorf("RunLoadVertex fetch partition error: %s", err)
			break
		}
		if resp.PartId == 0 {
			logrus.Infof("RunLoadVertex fetch part eof, worker: %d", ctx.Value("worker_id"))
			break
		}

		loader := graphio.MakeLoader(loadTask.LoadType)
		err = loader.Init(resp.Params, graph.Data.VertexPropertySchema)
		if err != nil {
			lb.SetStatusError(loadTask.Task.ID, fmt.Sprintf("graph loader init error: %s", err))
			logrus.Errorf("graph loader init error: %s", err)
			loader.Close()
			return
		}
		logrus.Infof("start read part: %s", loader.Name())
		for {
			err = loader.ReadVertex(&vertex, &property)
			if err != nil {
				if err == io.EOF {
					logrus.Infof("read part eof: %s, count: %d", loader.Name(), loader.ReadCount())
					break
				}
				lb.SetStatusError(loadTask.Task.ID, fmt.Sprintf("read vertex error: %s", err))
				logrus.Errorf("read vertex error: %s", err)
				loader.Close()
				return
			}
			workerIdx := common.HashBKDR(vertex.ID) % workerCount
			_ = sendBuffers[workerIdx].Marshal(&vertex)
			if graph.UseProperty {
				_ = sendBuffers[workerIdx].Marshal(&property)
			}
			if sendBuffers[workerIdx].Full() {
				atomic.AddInt32(loadTask.SendCount[peers[workerIdx].Name], 1)
				count := int32(sendBuffers[workerIdx].ObjCount())
				if graph.UseProperty {
					count = int32(sendBuffers[workerIdx].ObjCount() / 2)
				}
				if peers[workerIdx].Self {
					lb.RecvVertex(
						loadTask.Task.ID,
						peers[workerIdx].Name,
						count,
						false,
						0,
						sendBuffers[workerIdx].PayLoad())
				} else {
					peers[workerIdx].LoadActionHandler.LoadAction(
						loadTask.Task.ID,
						pb.LoadAction_LoadVertex,
						count,
						false,
						0,
						sendBuffers[workerIdx].PayLoad())
				}

				sendBuffers[workerIdx].Reset()
			}
		}
		loader.Close()
	}

	loadTask.Locker.Lock()
	atomic.AddInt32(loadTask.Parallel, -1)
	end := atomic.LoadInt32(loadTask.Parallel) == 0
	for i := range sendBuffers {
		atomic.AddInt32(loadTask.SendCount[peers[i].Name], 1)
	}
	loadTask.Locker.Unlock()

	for i := range sendBuffers {
		count := int32(sendBuffers[i].ObjCount())
		if graph.UseProperty {
			count = int32(sendBuffers[i].ObjCount() / 2)
		}
		if peers[i].Self {
			lb.RecvVertex(
				loadTask.Task.ID,
				peers[i].Name,
				count,
				end,
				atomic.LoadInt32(loadTask.SendCount[peers[i].Name]),
				sendBuffers[i].PayLoad())
		} else {
			peers[i].LoadActionHandler.LoadAction(
				loadTask.Task.ID,
				pb.LoadAction_LoadVertex,
				count,
				end,
				atomic.LoadInt32(loadTask.SendCount[peers[i].Name]),
				sendBuffers[i].PayLoad())
		}
		sendBuffers[i].Reset()
	}
	if end {
		for s := range loadTask.SendCount {
			*loadTask.SendCount[s] = 0
		}
	}
}