vermeer/apps/master/bl/grpc_handlers.go (421 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package bl import ( "context" "fmt" "sort" "strings" "sync" "time" "vermeer/apps/compute" "vermeer/apps/graphio" "vermeer/apps/master/threshold" "vermeer/apps/master/workers" pb "vermeer/apps/protos" "vermeer/apps/structure" "vermeer/apps/version" "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" ) type ServerHandler struct { pb.UnimplementedMasterServer locker sync.Mutex } func (h *ServerHandler) Init() { h.locker = sync.Mutex{} } func (h *ServerHandler) SayHelloMaster(ctx context.Context, req *pb.HelloMasterReq) (*pb.HelloMasterResp, error) { h.locker.Lock() defer h.locker.Unlock() p, _ := peer.FromContext(ctx) // check worker version, if not match, deny if req.Version != version.Version { err := fmt.Errorf("registration denied: inconsistent registration version! master version %v, worker version %v", req.Version, version.Version) logrus.Errorf(err.Error()) return &pb.HelloMasterResp{Base: &pb.BaseResponse{ErrorCode: -1, Message: err.Error()}}, err } semi := strings.LastIndex(p.Addr.String(), ":") if semi < 0 { err := fmt.Errorf("worker grpc peer error: %s", req.WorkerPeer) logrus.Errorf(err.Error()) return &pb.HelloMasterResp{Base: &pb.BaseResponse{ErrorCode: -1, Message: err.Error()}}, err } ip := p.Addr.String()[:semi] semi = strings.LastIndex(req.GetWorkerPeer(), ":") if semi < 0 { err := fmt.Errorf("worker grpc peer error: %s", req.WorkerPeer) logrus.Errorf(err.Error()) return &pb.HelloMasterResp{Base: &pb.BaseResponse{ErrorCode: -1, Message: err.Error()}}, err } port := req.GetWorkerPeer()[semi+1:] workers := workerMgr.GetAllWorkers() var existWorkerName string for _, worker := range workers { if worker.GrpcPeer == ip+":"+port { existWorkerName = worker.Name break } } if existWorkerName != "" { //如果worker已经存在,必须等待至grpc recv handler感知后,删除worker for workerMgr.GetWorker(existWorkerName) != nil { logrus.Infof("worker %v exist, wait one second", existWorkerName) time.Sleep(1 * time.Second) } } reqWorker, err := workerMgr.CreateWorker(ip+":"+port, ip, req.Version) if err != nil { logrus.Errorf("failed to create a WorkerClient, error: %s", err) return &pb.HelloMasterResp{WorkerId: -1, WorkerName: reqWorker.Name}, err } _, err = workerMgr.AddWorker(reqWorker) if err != nil { logrus.Errorf("failed to add a WorkerClient to the WorkerManager, error: %s", err) return &pb.HelloMasterResp{}, err } logrus.Infof("worker say hello name: %s, client: %s", reqWorker.Name, p.Addr.String()) resp := pb.HelloMasterResp{ WorkerId: reqWorker.Id, WorkerName: reqWorker.Name, } for _, v := range workers { if v.GrpcPeer == ip+":"+port { logrus.Debugf("workerMgr has a duplicate address, ignore it") continue } workerInfo := pb.WorkerInfo{ Name: v.Name, Id: v.Id, GrpcPeer: v.GrpcPeer, } resp.Workers = append(resp.Workers, &workerInfo) } // connect to client workerClient := workerMgr.GetWorker(reqWorker.Name) dialOptions := grpc.WithDefaultCallOptions( grpc.MaxCallSendMsgSize(1*1024*1024*1024), grpc.MaxCallRecvMsgSize(1*1024*1024*1024)) workerClient.Connection, err = grpc.Dial( ip+":"+port, grpc.WithTransportCredentials(insecure.NewCredentials()), dialOptions, grpc.WithBlock(), grpc.WithIdleTimeout(0)) if err != nil { logrus.Errorf("connect worker error: %s", err) return &resp, err } spaces := structure.SpaceManager.GetAllSpace() for _, space := range spaces { resp.Spaces = append(resp.Spaces, space.Name()) } workerClient.Session = pb.NewWorkerClient(workerClient.Connection) initWorkerClient(workerClient) graphs := graphMgr.GetGraphsByWorker(reqWorker.Name) taskCanRecovery := make([]*structure.TaskInfo, 0) for _, graph := range graphs { if graph.State == structure.GraphStateInComplete { workerAllAlive := true for _, worker := range graph.Workers { if !workerMgr.CheckWorkerAlive(worker.Name) { workerAllAlive = false break } } if workerAllAlive { graph.SetState(structure.GraphStateOnDisk) // InComplete is not a persistent state tasks := taskMgr.GetTaskByGraph(graph.SpaceName, graph.Name) for _, task := range tasks { if task.State == structure.TaskStateWaiting { logrus.Infof("waiting taskid:%v", task.ID) taskCanRecovery = append(taskCanRecovery, task) } else if task.State != structure.TaskStateComplete && task.State != structure.TaskStateLoaded && task.State != structure.TaskStateError { // task.SetErrMsg(fmt.Errorf("recovery task error, task state:%v", task.State).Error()) // task.SetState(structure.TaskStateError) taskMgr.SetError(task, fmt.Errorf("recovery task error, task state:%v", task.State).Error()) err := taskMgr.FinishTask(task.ID) if err != nil { logrus.Errorf("finish task error: %s", err) } } } } } } go func() { sort.Slice(taskCanRecovery, func(i, j int) bool { return taskCanRecovery[i].ID < taskCanRecovery[j].ID }) time.Sleep(3 * time.Second) for _, task := range taskCanRecovery { _, err = Scheduler.QueueTask(task) if err != nil { logrus.Errorf("recovery task error:%v", err) } logrus.Infof("worker:%v recovery task:%v type:%v ", workerClient.Name, task.ID, task.Type) } }() return &resp, nil } func initWorkerClient(workerClient *workers.WorkerClient) { logrus.Infof("init worker: %s@%s", workerClient.Name, workerClient.GrpcPeer) // init memory limit err := workers.SendMemLimit(workerClient, &workers.WorkerMemLimit{ MaxMem: threshold.GroupMaxMemory(workerClient.Group), MinFree: threshold.GroupMinFree(workerClient.Group), GcRatio: float32(threshold.GroupGcPct(workerClient.Group)) / 100, }) if err != nil { logrus.Errorf("send memory limit to worker '%s@%s' error: %s", workerClient.Name, workerClient.GrpcPeer, err) } } func (h *ServerHandler) LoadGraphTask(stream pb.Master_LoadGraphTaskServer) error { md, _ := metadata.FromIncomingContext(stream.Context()) name := md.Get("worker_name")[0] //WorkerMgr.GetWorker(name).loadServer.SetServer(stream) //WorkerMgr.GetWorker(name).loadServer.RecvHandler(name) return ServerMgr.StartingLoadServer(name, stream) } func (h *ServerHandler) ComputeTask(stream pb.Master_ComputeTaskServer) error { md, _ := metadata.FromIncomingContext(stream.Context()) name := md.Get("worker_name")[0] //WorkerMgr.GetWorker(name).computeServer.SetServer(stream) //WorkerMgr.GetWorker(name).computeServer.RecvHandler(name) return ServerMgr.StartingComputeServer(name, stream) } func (h *ServerHandler) SuperStep(req *pb.SuperStepReq, stream pb.Master_SuperStepServer) error { _ = req md, _ := metadata.FromIncomingContext(stream.Context()) name := md.Get("worker_name")[0] //WorkerMgr.GetWorker(name).SuperStepServer.SetServer(stream) if err := ServerMgr.PutSuperStepServer(name, stream); err != nil { return err } for i := 0; i < 100000000; i++ { time.Sleep(10 * time.Second) } //WorkerMgr.GetWorker(name).SuperStepServer.RecvHandler(name) return nil } func (h *ServerHandler) FetchLoadPart(ctx context.Context, req *pb.FetchLoadPartReq) (*pb.FetchLoadPartResp, error) { _ = ctx resp := pb.FetchLoadPartResp{} loadTask := LoadTaskBl{} partition := loadTask.FetchPartition(req.TaskId, req.WorkerName) resp.PartId = partition.Id resp.TaskId = req.TaskId resp.Params = partition.Params return &resp, nil } func (h *ServerHandler) LoadPartStatus( ctx context.Context, req *pb.LoadPartStatusReq) (*pb.LoadPartStatusResp, error) { _ = ctx resp := pb.LoadPartStatusResp{} loadTaskBl := LoadTaskBl{} loadTaskBl.LoadPartStatus(req.TaskId, req.PartId, req.State) return &resp, nil } func (h *ServerHandler) WorkVertexCount( ctx context.Context, req *pb.WorkerVertexCountReq) (*pb.WorkerVertexCountResp, error) { _ = ctx resp := pb.WorkerVertexCountResp{} loadTaskBl := LoadTaskBl{} loadTaskBl.WorkerVertexCount(req.TaskId, req.WorkerName, req.Count) return &resp, nil } func (h *ServerHandler) WorkEdgeCount( ctx context.Context, req *pb.WorkerEdgeCountReq) (*pb.WorkerEdgeCountResp, error) { _ = ctx resp := pb.WorkerEdgeCountResp{} loadTaskBl := LoadTaskBl{} loadTaskBl.WorkerEdgeCount(req.TaskId, req.WorkerName, req.Count) return &resp, nil } func (h *ServerHandler) GetGraphWorkers( ctx context.Context, req *pb.GetGraphWorkersReq) (*pb.GetGraphWorkersResp, error) { _ = ctx resp := pb.GetGraphWorkersResp{} loadTaskBl := LoadTaskBl{} gw := loadTaskBl.GetGraphWorkers(req.GetSpaceName(), req.GraphName) resp.Workers = make([]*pb.GraphWorker, 0, len(gw)) for _, v := range gw { w := pb.GraphWorker{ Name: v.Name, VertexCount: v.VertexCount, VertIdStart: v.VertIdStart, } resp.Workers = append(resp.Workers, &w) } return &resp, nil } func (h *ServerHandler) LoadTaskStatus( ctx context.Context, req *pb.LoadTaskStatusReq) (*pb.LoadTaskStatusResp, error) { h.locker.Lock() defer h.locker.Unlock() _ = ctx resp := pb.LoadTaskStatusResp{} loadTaskBl := LoadTaskBl{} loadTaskBl.LoadTaskStatus(req.TaskId, req.State, req.WorkerName, req.ErrorMsg) return &resp, nil } func (h *ServerHandler) ComputeTaskStatus( ctx context.Context, req *pb.ComputeTaskStatusReq) (*pb.ComputeTaskStatusResp, error) { _ = ctx h.locker.Lock() defer h.locker.Unlock() resp := pb.ComputeTaskStatusResp{} ctb := ComputeTaskBl{} ctb.ComputeTaskStatus(req.TaskId, req.State, req.WorkerName, req.Step, req.ComputeValues, req.ErrorMsg) return &resp, nil } func (h *ServerHandler) SettingGraph( ctx context.Context, req *pb.SettingGraphReq) (*pb.SettingGraphResp, error) { _ = ctx h.locker.Lock() defer h.locker.Unlock() resp := pb.SettingGraphResp{} ctb := ComputeTaskBl{} ctb.SettingGraphStatus(req.TaskId, req.State, req.WorkerName, req.ErrorMsg) return &resp, nil } func (h *ServerHandler) UploadVertexValue(ctx context.Context, in *pb.UploadVertexValueReq) (*pb.UploadVertexValueResp, error) { _ = ctx h.locker.Lock() defer h.locker.Unlock() computeTask := computerTaskMgr.GetTask(in.TaskId) for _, vertexValue := range in.GetVertexValues() { computeTask.ComputeValue.Values = append(computeTask.ComputeValue.Values, compute.VertexValue{ID: vertexValue.ID, Value: vertexValue.Value}) } //更新时间 if computeTask.ComputeValue.Timer != nil { computeTask.ComputeValue.Timer.Stop() } computerTaskMgr.InitTimer(computeTask.Task.ID) return &pb.UploadVertexValueResp{}, nil } func (h *ServerHandler) UploadStatistics(ctx context.Context, in *pb.UploadStatisticsReq) (*pb.UploadStatisticsResp, error) { _ = ctx h.locker.Lock() defer h.locker.Unlock() computeTask := computerTaskMgr.GetTask(in.TaskId) if computeTask.Statistics == nil { computeTask.Statistics = make([][]byte, 0) } computeTask.Statistics = append(computeTask.Statistics, in.GetStatistics()) return &pb.UploadStatisticsResp{}, nil } func (h *ServerHandler) UploadTPResult(ctx context.Context, in *pb.UploadTPResultReq) (*pb.UploadTPResultResp, error) { _ = ctx h.locker.Lock() defer h.locker.Unlock() computeTask := computerTaskMgr.GetTask(in.TaskId) if computeTask.TpResult.WorkerResult == nil { computeTask.TpResult.WorkerResult = make([][]byte, 0) } computeTask.TpResult.WorkerResult = append(computeTask.TpResult.WorkerResult, in.GetResult()) return &pb.UploadTPResultResp{}, nil } type LoadGraphServer struct { streamServer pb.Master_LoadGraphTaskServer } func (s *LoadGraphServer) SetServer(stream pb.Master_LoadGraphTaskServer) { s.streamServer = stream } func (s *LoadGraphServer) RecvHandler(workerName string) { logrus.Infof("load graph task recv handler setup: %s", workerName) for { req, err := s.streamServer.Recv() if err != nil { logrus.Errorf("load graph task recv: %s", err) time.Sleep(3 * time.Second) if workerBl.KickOffline(workerName) { break } // if !WorkerMgr.CheckWorkerAlive(workerName) { // WorkerMgr.RemoveWorker(workerName) // break // } continue } if req.State == graphio.LoadPartStatusPrepared { } else if req.State == graphio.LoadPartStatusDone { } } } type LoadingTaskReq struct { TaskId int32 Step pb.LoadStep LoadType string GraphName string SpaceName string Workers []string Params map[string]string } func (s *LoadGraphServer) SendLoadReq(req *LoadingTaskReq) error { return s.AsyncLoad(req.TaskId, req.Step, req.LoadType, req.GraphName, req.SpaceName, req.Workers, req.Params) } func (s *LoadGraphServer) AsyncLoad( taskId int32, step pb.LoadStep, loadType string, graphName string, spaceName string, workers []string, params map[string]string) error { err := s.streamServer.Send(&pb.LoadGraphTaskResp{ Base: &pb.BaseResponse{}, LoadType: loadType, TaskId: taskId, Params: params, Step: step, GraphName: graphName, SpaceName: spaceName, Workers: workers, }) if err != nil { logrus.Errorf("send load graph task error: %s", err) return err } return nil } type ComputeTaskServer struct { streamServer pb.Master_ComputeTaskServer } func (s *ComputeTaskServer) SetServer(stream pb.Master_ComputeTaskServer) { s.streamServer = stream } func (s *ComputeTaskServer) RecvHandler(workerName string) { logrus.Infof("compute task recv handler setup: %s", workerName) for { req, err := s.streamServer.Recv() if err != nil { logrus.Errorf("compute task recv error: %s", err) time.Sleep(3 * time.Second) if workerBl.KickOffline(workerName) { break } // if !WorkerMgr.CheckWorkerAlive(workerName) { // WorkerMgr.RemoveWorker(workerName) // break // } continue } _ = req } } func (s *ComputeTaskServer) AsyncCompute( algorithm string, graphName string, spaceName string, taskId int32, params map[string]string, action pb.ComputeAction) error { err := s.streamServer.Send(&pb.ComputeTaskResp{ Base: &pb.BaseResponse{}, TaskId: taskId, Algorithm: algorithm, GraphName: graphName, SpaceName: spaceName, Params: params, Action: action, }) if err != nil { logrus.Errorf("send async compute task error: %s", err) return err } return nil } type SuperStepServer struct { streamServer pb.Master_SuperStepServer } func (s *SuperStepServer) SetServer(stream pb.Master_SuperStepServer) { s.streamServer = stream } //func (s *SuperStepServer) RecvHandler(workerName string) { // logrus.Infof("super step recv handler setup: %s", workerName) // for { // req, err := s.streamServer.Recv() // if err != nil { // logrus.Errorf("super step recv: %s", err) // time.Sleep(3 * time.Second) // continue // } // _ = req // } //} func (s *SuperStepServer) AsyncSuperStep( taskId int32, step int32, output bool, computeValues map[string][]byte) { err := s.streamServer.Send(&pb.SuperStepResp{ Base: &pb.BaseResponse{}, TaskId: taskId, Step: step, ComputeValues: computeValues, Output: output, }) if err != nil { logrus.Errorf("AsyncSuperStep error: %s", err) } }