in vermeer/apps/worker/grpc_handlers.go [409:502]
func (dh *LoadActionHandler) RecvHandler(name string) {
logrus.Infof("load action recv handler setup: %s", name)
dh.locker = sync.Mutex{}
loadBl := LoadGraphBl{}
if dh.mode == HandlerModeServer {
for {
resp, err := dh.grpcServer.Recv()
if err != nil {
logrus.Errorf("recv load action error: %s", err)
time.Sleep(2 * time.Second)
if !PeerMgr.CheckPeerAlive(name) {
PeerMgr.RemovePeer(name)
break
}
continue
}
//logrus.Infof("LoadActionHandler server recv action: %v, end: %v", resp.Action, resp.End)
if resp.Action == pb.LoadAction_LoadVertex {
go loadBl.RecvVertex(
resp.TaskId,
resp.WorkerName,
resp.Count,
resp.End,
resp.Num,
resp.Data)
} else if resp.Action == pb.LoadAction_LoadScatter {
go loadBl.GatherVertex(
resp.TaskId,
resp.WorkerName,
resp.End,
resp.Num,
resp.Data)
} else if resp.Action == pb.LoadAction_LoadEdge {
go loadBl.RecvEdge(
resp.TaskId,
resp.WorkerName,
resp.Count,
resp.End,
resp.Num,
resp.Data)
} else if resp.Action == pb.LoadAction_LoadOutDegree {
go loadBl.GatherOutDegree(
resp.TaskId,
resp.WorkerName,
resp.End,
resp.Data)
}
}
} else if dh.mode == HandlerModeClient {
for {
resp, err := dh.grpcClient.Recv()
if err != nil {
logrus.Errorf("recv load action error: %s", err)
time.Sleep(2 * time.Second)
if !PeerMgr.CheckPeerAlive(name) {
PeerMgr.RemovePeer(name)
break
}
continue
}
//logrus.Infof("LoadActionHandler client recv action: %v, end: %v", resp.Action, resp.End)
if resp.Action == pb.LoadAction_LoadVertex {
go loadBl.RecvVertex(
resp.TaskId,
resp.WorkerName,
resp.Count,
resp.End,
resp.Num,
resp.Data)
} else if resp.Action == pb.LoadAction_LoadScatter {
go loadBl.GatherVertex(
resp.TaskId,
resp.WorkerName,
resp.End,
resp.Num,
resp.Data)
} else if resp.Action == pb.LoadAction_LoadEdge {
go loadBl.RecvEdge(
resp.TaskId,
resp.WorkerName,
resp.Count,
resp.End,
resp.Num,
resp.Data)
} else if resp.Action == pb.LoadAction_LoadOutDegree {
go loadBl.GatherOutDegree(
resp.TaskId,
resp.WorkerName,
resp.End,
resp.Data)
}
}
}
}