vermeer/apps/worker/grpc_handlers.go (710 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF
licenses this file to You under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
*/
package worker
import (
"context"
"fmt"
"strings"
"sync"
"time"
"vermeer/apps/common"
"vermeer/apps/graphio"
pb "vermeer/apps/protos"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
)
type PeerHandler struct {
pb.UnimplementedWorkerServer
}
func (ph *PeerHandler) SayHelloPeer(ctx context.Context, req *pb.HelloPeerReq) (*pb.HelloPeerResp, error) {
p, _ := peer.FromContext(ctx)
logrus.Infof("peer say hello name: %s, client: %s", req.GetSourceName(), p.Addr.String())
semi := strings.LastIndex(p.Addr.String(), ":")
if semi < 0 {
err := fmt.Errorf("worker grpc peer ip error: %s", req)
logrus.Errorf(err.Error())
return &pb.HelloPeerResp{Base: &pb.BaseResponse{ErrorCode: -1, Message: err.Error()}}, err
}
ip := p.Addr.String()[:semi]
semi = strings.LastIndex(req.GetWorkerPeer(), ":")
if semi < 0 {
err := fmt.Errorf("worker grpc peer port error: %s", req.WorkerPeer)
logrus.Errorf(err.Error())
return &pb.HelloPeerResp{Base: &pb.BaseResponse{ErrorCode: -1, Message: err.Error()}}, err
}
port := req.GetWorkerPeer()[semi+1:]
if ServiceWorker.WorkerName != req.TargetName {
err := fmt.Errorf("worker grpc peer target error: %s", req.TargetName)
logrus.Errorf(err.Error())
return &pb.HelloPeerResp{Base: &pb.BaseResponse{ErrorCode: -2, Message: err.Error()}}, err
}
PeerMgr.AddPeer(req.SourceName, req.Id, ip+":"+port)
return &pb.HelloPeerResp{}, nil
}
func (ph *PeerHandler) Scatter(stream pb.Worker_ScatterServer) error {
md, _ := metadata.FromIncomingContext(stream.Context())
name := md.Get("worker_name")[0]
PeerMgr.GetPeer(name).ScatterHandler.SetServer(stream)
PeerMgr.GetPeer(name).ScatterHandler.RecvHandler(name)
return nil
}
func (ph *PeerHandler) LoadAction(stream pb.Worker_LoadActionServer) error {
md, _ := metadata.FromIncomingContext(stream.Context())
name := md.Get("worker_name")[0]
PeerMgr.GetPeer(name).LoadActionHandler.SetServer(stream)
PeerMgr.GetPeer(name).LoadActionHandler.RecvHandler(name)
return nil
}
func (ph *PeerHandler) StepEnd(stream pb.Worker_StepEndServer) error {
md, _ := metadata.FromIncomingContext(stream.Context())
name := md.Get("worker_name")[0]
PeerMgr.GetPeer(name).StepEndHandler.SetServer(stream)
PeerMgr.GetPeer(name).StepEndHandler.RecvHandler(name)
return nil
}
func (ph *PeerHandler) SettingAction(stream pb.Worker_SettingActionServer) error {
md, _ := metadata.FromIncomingContext(stream.Context())
name := md.Get("worker_name")[0]
PeerMgr.GetPeer(name).SettingActionHandler.SetServer(stream)
PeerMgr.GetPeer(name).SettingActionHandler.RecvHandler(name)
return nil
}
func (ph *PeerHandler) DeleteGraph(ctx context.Context, req *pb.DeleteGraphReq) (*pb.DeleteGraphResp, error) {
_ = ctx
if req.DeleteFile {
GraphMgr.DeleteGraphFile(req.GetSpaceName(), req.GraphName)
}
GraphMgr.DeleteGraph(req.GetSpaceName(), req.GraphName)
return &pb.DeleteGraphResp{}, nil
}
func (ph *PeerHandler) GetEdges(ctx context.Context, req *pb.GetEdgesReq) (*pb.GetEdgesResp, error) {
_ = ctx
eb := EdgesBl{}
inEdges, outEdges, edgeProperty, err := eb.GetEdges(req.GetSpaceName(), req.GraphName, req.VertexId, req.Direction)
if err != nil {
return &pb.GetEdgesResp{}, err
}
return &pb.GetEdgesResp{InEdges: inEdges, OutEdges: outEdges, InEdgeProperty: edgeProperty}, nil
}
func (ph *PeerHandler) GetVertex(ctx context.Context, req *pb.GetVertexReq) (*pb.GetVertexResp, error) {
_, _ = ctx, req
graph := GraphMgr.GetGraphByName(req.GetSpaceName(), req.GetGraphName())
vertexIds := req.GetVertexId()
if graph != nil && len(vertexIds) > 0 {
resp := &pb.GetVertexResp{}
resp.Verts = make([]*pb.VertexInfo, 0, len(vertexIds))
for i := 0; i < len(vertexIds); i++ {
resp.Verts = append(resp.Verts, &pb.VertexInfo{})
shortId, ok := graph.Data.Vertex.GetVertexIndex(vertexIds[i])
if !ok {
logrus.Errorf("vertexId %s not found", vertexIds[i])
continue
}
if shortId < graph.Data.VertIDStart ||
shortId >= graph.Data.VertIDStart+graph.Data.VertexCount {
logrus.Errorf("vertexId %s not in range", vertexIds[i])
continue
}
resp.Verts[i].ID = vertexIds[i]
if graph.UseProperty {
properties := make(map[string]string)
for _, ps := range graph.Data.VertexPropertySchema.Schema {
properties[ps.PropKey] = graph.Data.VertexProperty.GetValue(ps.PropKey, shortId).ToString()
}
resp.Verts[i].Property = properties
}
}
return resp, nil
}
return nil, nil
}
// ControlTask 控制任务
func (ph *PeerHandler) ControlTask(ctx context.Context, req *pb.ControlTaskReq) (*pb.ControlTaskResp, error) {
_ = ctx
task := TaskMgr.GetTaskByID(req.GetTaskID())
if task == nil {
return &pb.ControlTaskResp{}, nil
}
err := TaskMgr.SetAction(task, req.Action)
if err != nil {
return &pb.ControlTaskResp{}, err
}
//SELECT:
// for {
// switch task.State {
// case structure.TaskStateLoaded, structure.TaskStateComplete:
// break SELECT
// case structure.TaskStateCanceled, structure.TaskStateError:
// break SELECT
// case structure.TaskStateCanceling:
// time.Sleep(100 * time.Millisecond)
// default:
// task.SetState(structure.TaskStateCanceling)
// }
// }
return &pb.ControlTaskResp{}, nil
}
func (ph *PeerHandler) SaveGraph(ctx context.Context, req *pb.GraphPersistenceReq) (*pb.GraphPersistenceResp, error) {
_ = ctx
logrus.Infof("SaveGraph %v", req.GraphName)
err := GraphMgr.SaveGraph(req.GetSpaceName(), req.GraphName, ServiceWorker.WorkerName)
res := new(pb.GraphPersistenceResp)
if err != nil {
logrus.Infof("SaveGraph %v error,err=%v", req.GraphName, err.Error())
res.Base = &pb.BaseResponse{ErrorCode: -1, Message: err.Error()}
}
return res, err
}
func (ph *PeerHandler) ReadGraph(ctx context.Context, req *pb.GraphPersistenceReq) (*pb.GraphPersistenceResp, error) {
_ = ctx
graphName := req.GetGraphName()
res := new(pb.GraphPersistenceResp)
logrus.Infof("ReadGraph %v", req.GraphName)
err := GraphMgr.ReadGraph(req.GetSpaceName(), graphName, ServiceWorker.WorkerName)
if err != nil {
logrus.Infof("ReadGraph %v error,err=%v", req.GraphName, err.Error())
res.Base = &pb.BaseResponse{ErrorCode: -1, Message: err.Error()}
return res, err
}
return res, nil
}
func (ph *PeerHandler) WriteDisk(ctx context.Context, req *pb.GraphPersistenceReq) (*pb.GraphPersistenceResp, error) {
_ = ctx
graphName := req.GetGraphName()
res := new(pb.GraphPersistenceResp)
logrus.Infof("WriteDisk %v", req.GraphName)
err := GraphMgr.WriteDisk(req.SpaceName, graphName, ServiceWorker.WorkerName)
if err != nil {
logrus.Infof("WriteDisk %v error,err=%v", req.GraphName, err.Error())
res.Base = &pb.BaseResponse{ErrorCode: -1, Message: err.Error()}
return res, err
}
return res, nil
}
func (ph *PeerHandler) GetWorkerStatInfo(ctx context.Context, req *pb.WorkerStatInfoReq) (*pb.WorkerStatInfoResp, error) {
_ = ctx
_ = req
resp := new(pb.WorkerStatInfoResp)
mp, err := common.MachineMemUsedPercent()
if err != nil {
resp.Base.ErrorCode = -1
resp.Base.Message = err.Error()
}
resp.MemMachineUsedPercent = mp
pp, err := common.ProcessMemUsedPercent()
if err != nil {
resp.Base.ErrorCode = -1
resp.Base.Message = err.Error()
}
resp.MemProcessUsedPercent = pp
//logrus.Infof("GetWorkerStatInfo mem:%v", common.PrintMemUsage())
//logrus.Infof("GetWorkerStatInfo res:%v", resp)
return resp, nil
}
func (ph *PeerHandler) RuntimeAction(ctx context.Context, req *pb.RuntimeActionReq) (*pb.RuntimeActionResp, error) {
_ = ctx
var resp *pb.RuntimeActionResp
switch req.GetRequest().(type) {
case *pb.RuntimeActionReq_HostInfoReq:
hostInfo, err := RuntimeSpv.HostInfo()
if err != nil {
logrus.Errorf("get host info error:%v", err)
return nil, err
}
resp = &pb.RuntimeActionResp{
Response: &pb.RuntimeActionResp_HostInfoResp{
HostInfoResp: &pb.GetHostInfoResp{
TotalMemory: uint32(hostInfo.TotalMemory / (1024 * 1024)),
AvailableMemory: uint32(hostInfo.AvailableMemory / (1024 * 1024)),
CPUCount: uint32(hostInfo.CPUs),
},
},
}
case *pb.RuntimeActionReq_MemoryLimitReq:
RuntimeSpv.SetMemoryLimit(
uint64(req.GetMemoryLimitReq().MaxMemoryUsed)*1024*1024,
uint64(req.GetMemoryLimitReq().MinRemainMemory)*1024*1024,
req.GetMemoryLimitReq().SoftMemoryLimitRatio,
)
resp = &pb.RuntimeActionResp{Response: &pb.RuntimeActionResp_MemoryLimitResp{MemoryLimitResp: &pb.SetMemoryLimitResp{}}}
case *pb.RuntimeActionReq_CPULimitReq:
RuntimeSpv.SetMaxCPU(int(req.GetCPULimitReq().MaxCPU))
resp = &pb.RuntimeActionResp{Response: &pb.RuntimeActionResp_CPULimitResp{CPULimitResp: &pb.SetCPULimitResp{}}}
}
return resp, nil
}
type LoadGraphTaskHandler struct {
grpcStream pb.Master_LoadGraphTaskClient
}
func (rh *LoadGraphTaskHandler) HandleLoadGraphTask() {
for {
resp, err := rh.grpcStream.Recv()
if err != nil {
logrus.Errorf("recv load graph task error: %s", err)
time.Sleep(3 * time.Second)
ServiceWorker.ReconnectMaster()
continue
}
loadBl := LoadGraphBl{}
if resp.Step == pb.LoadStep_Vertex {
loadBl.StartLoadGraph(resp.SpaceName, resp.TaskId, resp.GraphName, resp.Workers, resp.Params)
} else if resp.Step == pb.LoadStep_ScatterVertex {
loadBl.ScatterVertex(resp.TaskId)
} else if resp.Step == pb.LoadStep_Edge {
loadBl.LoadEdge(resp.TaskId)
} else if resp.Step == pb.LoadStep_Complete {
loadBl.LoadComplete(resp.TaskId)
} else if resp.Step == pb.LoadStep_OutDegree {
loadBl.ScatterOutDegree(resp.TaskId)
} else if resp.Step == pb.LoadStep_Error {
loadBl.GetStatusError(resp.TaskId)
}
}
}
func (rh *LoadGraphTaskHandler) LoadComplete(taskId int32, partId int32) error {
err := rh.grpcStream.Send(&pb.LoadGraphTaskReq{
TaskId: taskId,
PartId: partId,
WorkerName: ServiceWorker.WorkerName,
State: graphio.LoadPartStatusDone,
})
return err
}
type ComputeTaskHandler struct {
grpcStream pb.Master_ComputeTaskClient
}
func (rh *ComputeTaskHandler) HandleComputeTask() {
for {
resp, err := rh.grpcStream.Recv()
if err != nil {
logrus.Errorf("recv compute task error: %s", err)
time.Sleep(3 * time.Second)
ServiceWorker.ReconnectMaster()
continue
}
cb := ComputeBl{}
sb := SettingBl{}
switch resp.Action {
case pb.ComputeAction_Compute:
cb.StartCompute(resp.TaskId, resp.SpaceName, resp.GraphName, resp.Algorithm, resp.Params)
case pb.ComputeAction_SettingOutEdges:
sb.StartSettingOutEdges(resp.TaskId, resp.SpaceName, resp.GraphName, resp.Params)
case pb.ComputeAction_SettingOutDegree:
sb.StartSettingOutDegree(resp.TaskId, resp.SpaceName, resp.GraphName, resp.Params)
}
}
}
func (rh *ComputeTaskHandler) ComputeComplete(taskId int32) error {
err := rh.grpcStream.Send(&pb.ComputeTaskReq{
TaskId: taskId,
WorkerName: ServiceWorker.WorkerName,
})
return err
}
type SuperStepHandler struct {
grpcStream pb.Master_SuperStepClient
}
func (rh *SuperStepHandler) HandleSuperStep() {
for {
resp, err := rh.grpcStream.Recv()
if err != nil {
logrus.Errorf("recv super step error: %s", err)
time.Sleep(3 * time.Second)
ServiceWorker.ReconnectMaster()
continue
}
cb := ComputeBl{}
if resp.Output {
go cb.RunOutput(resp.TaskId)
} else {
go cb.RunSuperStep(resp.TaskId, resp.ComputeValues)
}
}
}
//func (rh *SuperStepHandler) SuperStepDone(taskId int32, status string) error {
// err := rh.grpcStream.Send(&pb.SuperStepReq{
// TaskId: taskId,
// State: status,
// WorkerName: ServiceWorker.WorkerName,
// })
// return err
//}
const (
HandlerModeClient = byte(1)
HandlerModeServer = byte(2)
)
type LoadActionHandler struct {
mode byte
grpcServer pb.Worker_LoadActionServer
grpcClient pb.Worker_LoadActionClient
locker sync.Mutex
}
func (dh *LoadActionHandler) SetServer(stream pb.Worker_LoadActionServer) {
dh.mode = HandlerModeServer
dh.grpcServer = stream
}
func (dh *LoadActionHandler) SetClient(client pb.Worker_LoadActionClient) {
dh.mode = HandlerModeClient
dh.grpcClient = client
}
func (dh *LoadActionHandler) RecvHandler(name string) {
logrus.Infof("load action recv handler setup: %s", name)
dh.locker = sync.Mutex{}
loadBl := LoadGraphBl{}
if dh.mode == HandlerModeServer {
for {
resp, err := dh.grpcServer.Recv()
if err != nil {
logrus.Errorf("recv load action error: %s", err)
time.Sleep(2 * time.Second)
if !PeerMgr.CheckPeerAlive(name) {
PeerMgr.RemovePeer(name)
break
}
continue
}
//logrus.Infof("LoadActionHandler server recv action: %v, end: %v", resp.Action, resp.End)
if resp.Action == pb.LoadAction_LoadVertex {
go loadBl.RecvVertex(
resp.TaskId,
resp.WorkerName,
resp.Count,
resp.End,
resp.Num,
resp.Data)
} else if resp.Action == pb.LoadAction_LoadScatter {
go loadBl.GatherVertex(
resp.TaskId,
resp.WorkerName,
resp.End,
resp.Num,
resp.Data)
} else if resp.Action == pb.LoadAction_LoadEdge {
go loadBl.RecvEdge(
resp.TaskId,
resp.WorkerName,
resp.Count,
resp.End,
resp.Num,
resp.Data)
} else if resp.Action == pb.LoadAction_LoadOutDegree {
go loadBl.GatherOutDegree(
resp.TaskId,
resp.WorkerName,
resp.End,
resp.Data)
}
}
} else if dh.mode == HandlerModeClient {
for {
resp, err := dh.grpcClient.Recv()
if err != nil {
logrus.Errorf("recv load action error: %s", err)
time.Sleep(2 * time.Second)
if !PeerMgr.CheckPeerAlive(name) {
PeerMgr.RemovePeer(name)
break
}
continue
}
//logrus.Infof("LoadActionHandler client recv action: %v, end: %v", resp.Action, resp.End)
if resp.Action == pb.LoadAction_LoadVertex {
go loadBl.RecvVertex(
resp.TaskId,
resp.WorkerName,
resp.Count,
resp.End,
resp.Num,
resp.Data)
} else if resp.Action == pb.LoadAction_LoadScatter {
go loadBl.GatherVertex(
resp.TaskId,
resp.WorkerName,
resp.End,
resp.Num,
resp.Data)
} else if resp.Action == pb.LoadAction_LoadEdge {
go loadBl.RecvEdge(
resp.TaskId,
resp.WorkerName,
resp.Count,
resp.End,
resp.Num,
resp.Data)
} else if resp.Action == pb.LoadAction_LoadOutDegree {
go loadBl.GatherOutDegree(
resp.TaskId,
resp.WorkerName,
resp.End,
resp.Data)
}
}
}
}
func (dh *LoadActionHandler) LoadAction(taskId int32, action pb.LoadAction, count int32, end bool, endNum int32, data []byte) {
dh.locker.Lock()
defer dh.locker.Unlock()
if dh.mode == HandlerModeServer {
req := pb.LoadActionResp{
TaskId: taskId,
Data: data,
Count: count,
Action: action,
End: end,
Num: endNum,
WorkerName: ServiceWorker.WorkerName,
}
err := dh.grpcServer.Send(&req)
if err != nil {
logrus.Errorf("send do action error: %s", err)
}
//logrus.Infof("LoadActionHandler server send action: %v", action)
} else if dh.mode == HandlerModeClient {
req := pb.LoadActionReq{
TaskId: taskId,
Data: data,
Count: count,
Action: action,
End: end,
Num: endNum,
WorkerName: ServiceWorker.WorkerName,
}
err := dh.grpcClient.Send(&req)
if err != nil {
logrus.Errorf("send do action error: %s", err)
}
//logrus.Infof("LoadActionHandler client send action: %v", action)
}
}
type ScatterHandler struct {
mode byte
grpcServer pb.Worker_ScatterServer
grpcClient pb.Worker_ScatterClient
locker sync.Mutex
}
func (sh *ScatterHandler) SetServer(stream pb.Worker_ScatterServer) {
sh.mode = HandlerModeServer
sh.grpcServer = stream
}
func (sh *ScatterHandler) SetClient(client pb.Worker_ScatterClient) {
sh.mode = HandlerModeClient
sh.grpcClient = client
}
func (sh *ScatterHandler) RecvHandler(name string) {
logrus.Infof("scatter recv handler setup: %s", name)
sh.locker = sync.Mutex{}
if sh.mode == HandlerModeServer {
for {
resp, err := sh.grpcServer.Recv()
if err != nil {
logrus.Errorf("recv scatter error: %s", err)
time.Sleep(2 * time.Second)
if !PeerMgr.CheckPeerAlive(name) {
PeerMgr.RemovePeer(name)
break
}
continue
}
cb := ComputeBl{}
go cb.RecvScatter(resp.TaskId, resp.Data, resp.End, resp.SIdx)
}
} else if sh.mode == HandlerModeClient {
for {
resp, err := sh.grpcClient.Recv()
if err != nil {
logrus.Errorf("recv scatter error: %s", err)
time.Sleep(2 * time.Second)
if !PeerMgr.CheckPeerAlive(name) {
PeerMgr.RemovePeer(name)
break
}
continue
}
cb := ComputeBl{}
go cb.RecvScatter(resp.TaskId, resp.Data, resp.End, resp.SIdx)
}
}
}
func (sh *ScatterHandler) SendScatter(taskId int32, step int32, count int32, end bool, sIdx int32, data []byte) {
sh.locker.Lock()
defer sh.locker.Unlock()
if sh.mode == HandlerModeServer {
req := pb.ScatterResp{
TaskId: taskId,
Step: step,
Data: data,
Count: count,
End: end,
SIdx: sIdx,
WorkerName: ServiceWorker.WorkerName,
}
err := sh.grpcServer.Send(&req)
if err != nil {
logrus.Errorf("send scatter error: %s", err)
}
} else if sh.mode == HandlerModeClient {
req := pb.ScatterReq{
TaskId: taskId,
Step: step,
Data: data,
Count: count,
End: end,
SIdx: sIdx,
WorkerName: ServiceWorker.WorkerName,
}
err := sh.grpcClient.Send(&req)
if err != nil {
logrus.Errorf("send scatter error: %s", err)
}
}
}
type StepEndHandler struct {
mode byte
grpcServer pb.Worker_StepEndServer
grpcClient pb.Worker_StepEndClient
locker sync.Mutex
}
func (sh *StepEndHandler) SetServer(stream pb.Worker_StepEndServer) {
sh.mode = HandlerModeServer
sh.grpcServer = stream
}
func (sh *StepEndHandler) SetClient(client pb.Worker_StepEndClient) {
sh.mode = HandlerModeClient
sh.grpcClient = client
}
func (sh *StepEndHandler) RecvHandler(name string) {
logrus.Infof("step end recv handler setup: %s", name)
sh.locker = sync.Mutex{}
if sh.mode == HandlerModeServer {
for {
resp, err := sh.grpcServer.Recv()
if err != nil {
logrus.Errorf("recv step end error: %s", err)
time.Sleep(2 * time.Second)
if !PeerMgr.CheckPeerAlive(name) {
PeerMgr.RemovePeer(name)
break
}
continue
}
cb := ComputeBl{}
go cb.StepEnd(resp.TaskId, resp.WorkerName)
}
} else if sh.mode == HandlerModeClient {
for {
resp, err := sh.grpcClient.Recv()
if err != nil {
logrus.Errorf("recv step end error: %s", err)
time.Sleep(2 * time.Second)
if !PeerMgr.CheckPeerAlive(name) {
PeerMgr.RemovePeer(name)
break
}
continue
}
cb := ComputeBl{}
go cb.StepEnd(resp.TaskId, resp.WorkerName)
}
}
}
func (sh *StepEndHandler) SendStepEnd(taskId int32) {
sh.locker.Lock()
defer sh.locker.Unlock()
if sh.mode == HandlerModeServer {
req := pb.StepEndResp{
TaskId: taskId,
WorkerName: ServiceWorker.WorkerName,
}
err := sh.grpcServer.Send(&req)
if err != nil {
logrus.Errorf("send scatter error: %s", err)
}
} else if sh.mode == HandlerModeClient {
req := pb.StepEndReq{
TaskId: taskId,
WorkerName: ServiceWorker.WorkerName,
}
err := sh.grpcClient.Send(&req)
if err != nil {
logrus.Errorf("send scatter error: %s", err)
}
}
}
type SettingActionHandler struct {
mode byte
grpcServer pb.Worker_SettingActionServer
grpcClient pb.Worker_SettingActionClient
locker sync.Mutex
}
func (dh *SettingActionHandler) SetServer(stream pb.Worker_SettingActionServer) {
dh.mode = HandlerModeServer
dh.grpcServer = stream
}
func (dh *SettingActionHandler) SetClient(client pb.Worker_SettingActionClient) {
dh.mode = HandlerModeClient
dh.grpcClient = client
}
func (dh *SettingActionHandler) RecvHandler(name string) {
logrus.Infof("load action recv handler setup: %s", name)
dh.locker = sync.Mutex{}
settingBl := SettingBl{}
if dh.mode == HandlerModeServer {
for {
resp, err := dh.grpcServer.Recv()
if err != nil {
logrus.Errorf("recv setting action error: %s", err)
time.Sleep(2 * time.Second)
if !PeerMgr.CheckPeerAlive(name) {
PeerMgr.RemovePeer(name)
break
}
continue
}
//logrus.Infof("LoadActionHandler server recv action: %v, end: %v", resp.Action, resp.End)
if resp.Action == pb.SettingAction_SetOutEdges {
go settingBl.GatherOutEdges(
resp.TaskId,
resp.WorkerName,
resp.End,
resp.Num,
resp.Data)
} else if resp.Action == pb.SettingAction_SetOutDegree {
go settingBl.GatherOutDegree(
resp.TaskId,
resp.WorkerName,
resp.End,
resp.Num,
resp.Data)
}
}
} else if dh.mode == HandlerModeClient {
for {
resp, err := dh.grpcClient.Recv()
if err != nil {
logrus.Errorf("recv setting action error: %s", err)
time.Sleep(2 * time.Second)
if !PeerMgr.CheckPeerAlive(name) {
PeerMgr.RemovePeer(name)
break
}
continue
}
//logrus.Infof("LoadActionHandler client recv action: %v, end: %v", resp.Action, resp.End)
if resp.Action == pb.SettingAction_SetOutEdges {
go settingBl.GatherOutEdges(
resp.TaskId,
resp.WorkerName,
resp.End,
resp.Num,
resp.Data)
} else if resp.Action == pb.SettingAction_SetOutDegree {
go settingBl.GatherOutDegree(
resp.TaskId,
resp.WorkerName,
resp.End,
resp.Num,
resp.Data)
}
}
}
}
func (dh *SettingActionHandler) SettingAction(taskId int32, action pb.SettingAction, end bool, endNum int32, data []byte) {
dh.locker.Lock()
defer dh.locker.Unlock()
if dh.mode == HandlerModeServer {
req := pb.SettingActionResp{
TaskId: taskId,
Data: data,
Action: action,
End: end,
Num: endNum,
WorkerName: ServiceWorker.WorkerName,
}
err := dh.grpcServer.Send(&req)
if err != nil {
logrus.Errorf("send do action error: %s", err)
}
//logrus.Infof("LoadActionHandler server send action: %v", action)
} else if dh.mode == HandlerModeClient {
req := pb.SettingActionReq{
TaskId: taskId,
Data: data,
Action: action,
End: end,
Num: endNum,
WorkerName: ServiceWorker.WorkerName,
}
err := dh.grpcClient.Send(&req)
if err != nil {
logrus.Errorf("send do action error: %s", err)
}
//logrus.Infof("LoadActionHandler client send action: %v", action)
}
}