func()

in vermeer/apps/worker/grpc_handlers.go [409:502]


func (dh *LoadActionHandler) RecvHandler(name string) {
	logrus.Infof("load action recv handler setup: %s", name)
	dh.locker = sync.Mutex{}
	loadBl := LoadGraphBl{}
	if dh.mode == HandlerModeServer {
		for {
			resp, err := dh.grpcServer.Recv()
			if err != nil {
				logrus.Errorf("recv load action error: %s", err)
				time.Sleep(2 * time.Second)
				if !PeerMgr.CheckPeerAlive(name) {
					PeerMgr.RemovePeer(name)
					break
				}
				continue
			}
			//logrus.Infof("LoadActionHandler server recv action: %v, end: %v", resp.Action, resp.End)
			if resp.Action == pb.LoadAction_LoadVertex {
				go loadBl.RecvVertex(
					resp.TaskId,
					resp.WorkerName,
					resp.Count,
					resp.End,
					resp.Num,
					resp.Data)
			} else if resp.Action == pb.LoadAction_LoadScatter {
				go loadBl.GatherVertex(
					resp.TaskId,
					resp.WorkerName,
					resp.End,
					resp.Num,
					resp.Data)
			} else if resp.Action == pb.LoadAction_LoadEdge {
				go loadBl.RecvEdge(
					resp.TaskId,
					resp.WorkerName,
					resp.Count,
					resp.End,
					resp.Num,
					resp.Data)
			} else if resp.Action == pb.LoadAction_LoadOutDegree {
				go loadBl.GatherOutDegree(
					resp.TaskId,
					resp.WorkerName,
					resp.End,
					resp.Data)
			}
		}
	} else if dh.mode == HandlerModeClient {
		for {
			resp, err := dh.grpcClient.Recv()
			if err != nil {
				logrus.Errorf("recv load action error: %s", err)
				time.Sleep(2 * time.Second)
				if !PeerMgr.CheckPeerAlive(name) {
					PeerMgr.RemovePeer(name)
					break
				}
				continue
			}
			//logrus.Infof("LoadActionHandler client recv action: %v, end: %v", resp.Action, resp.End)
			if resp.Action == pb.LoadAction_LoadVertex {
				go loadBl.RecvVertex(
					resp.TaskId,
					resp.WorkerName,
					resp.Count,
					resp.End,
					resp.Num,
					resp.Data)
			} else if resp.Action == pb.LoadAction_LoadScatter {
				go loadBl.GatherVertex(
					resp.TaskId,
					resp.WorkerName,
					resp.End,
					resp.Num,
					resp.Data)
			} else if resp.Action == pb.LoadAction_LoadEdge {
				go loadBl.RecvEdge(
					resp.TaskId,
					resp.WorkerName,
					resp.Count,
					resp.End,
					resp.Num,
					resp.Data)
			} else if resp.Action == pb.LoadAction_LoadOutDegree {
				go loadBl.GatherOutDegree(
					resp.TaskId,
					resp.WorkerName,
					resp.End,
					resp.Data)
			}
		}
	}
}