func()

in vermeer/apps/master/bl/grpc_handlers.go [51:192]


func (h *ServerHandler) SayHelloMaster(ctx context.Context, req *pb.HelloMasterReq) (*pb.HelloMasterResp, error) {
	h.locker.Lock()
	defer h.locker.Unlock()
	p, _ := peer.FromContext(ctx)

	// check worker version, if not match, deny
	if req.Version != version.Version {
		err := fmt.Errorf("registration denied: inconsistent registration version! master version %v, worker version %v", req.Version, version.Version)
		logrus.Errorf(err.Error())
		return &pb.HelloMasterResp{Base: &pb.BaseResponse{ErrorCode: -1, Message: err.Error()}}, err
	}

	semi := strings.LastIndex(p.Addr.String(), ":")
	if semi < 0 {
		err := fmt.Errorf("worker grpc peer error: %s", req.WorkerPeer)
		logrus.Errorf(err.Error())
		return &pb.HelloMasterResp{Base: &pb.BaseResponse{ErrorCode: -1, Message: err.Error()}}, err
	}
	ip := p.Addr.String()[:semi]

	semi = strings.LastIndex(req.GetWorkerPeer(), ":")
	if semi < 0 {
		err := fmt.Errorf("worker grpc peer error: %s", req.WorkerPeer)
		logrus.Errorf(err.Error())
		return &pb.HelloMasterResp{Base: &pb.BaseResponse{ErrorCode: -1, Message: err.Error()}}, err
	}
	port := req.GetWorkerPeer()[semi+1:]

	workers := workerMgr.GetAllWorkers()
	var existWorkerName string
	for _, worker := range workers {
		if worker.GrpcPeer == ip+":"+port {
			existWorkerName = worker.Name
			break
		}
	}
	if existWorkerName != "" {
		//如果worker已经存在,必须等待至grpc recv handler感知后,删除worker
		for workerMgr.GetWorker(existWorkerName) != nil {
			logrus.Infof("worker %v exist, wait one second", existWorkerName)
			time.Sleep(1 * time.Second)
		}
	}

	reqWorker, err := workerMgr.CreateWorker(ip+":"+port, ip, req.Version)
	if err != nil {
		logrus.Errorf("failed to create a WorkerClient, error: %s", err)
		return &pb.HelloMasterResp{WorkerId: -1, WorkerName: reqWorker.Name}, err
	}

	_, err = workerMgr.AddWorker(reqWorker)
	if err != nil {
		logrus.Errorf("failed to add a WorkerClient to the WorkerManager, error: %s", err)
		return &pb.HelloMasterResp{}, err
	}

	logrus.Infof("worker say hello name: %s, client: %s", reqWorker.Name, p.Addr.String())

	resp := pb.HelloMasterResp{
		WorkerId:   reqWorker.Id,
		WorkerName: reqWorker.Name,
	}

	for _, v := range workers {
		if v.GrpcPeer == ip+":"+port {
			logrus.Debugf("workerMgr has a duplicate address, ignore it")
			continue
		}
		workerInfo := pb.WorkerInfo{
			Name:     v.Name,
			Id:       v.Id,
			GrpcPeer: v.GrpcPeer,
		}
		resp.Workers = append(resp.Workers, &workerInfo)
	}

	// connect to client
	workerClient := workerMgr.GetWorker(reqWorker.Name)
	dialOptions := grpc.WithDefaultCallOptions(
		grpc.MaxCallSendMsgSize(1*1024*1024*1024),
		grpc.MaxCallRecvMsgSize(1*1024*1024*1024))

	workerClient.Connection, err = grpc.Dial(
		ip+":"+port, grpc.WithTransportCredentials(insecure.NewCredentials()), dialOptions, grpc.WithBlock(), grpc.WithIdleTimeout(0))
	if err != nil {
		logrus.Errorf("connect worker error: %s", err)
		return &resp, err
	}
	spaces := structure.SpaceManager.GetAllSpace()
	for _, space := range spaces {
		resp.Spaces = append(resp.Spaces, space.Name())
	}
	workerClient.Session = pb.NewWorkerClient(workerClient.Connection)

	initWorkerClient(workerClient)

	graphs := graphMgr.GetGraphsByWorker(reqWorker.Name)
	taskCanRecovery := make([]*structure.TaskInfo, 0)
	for _, graph := range graphs {
		if graph.State == structure.GraphStateInComplete {
			workerAllAlive := true
			for _, worker := range graph.Workers {
				if !workerMgr.CheckWorkerAlive(worker.Name) {
					workerAllAlive = false
					break
				}
			}
			if workerAllAlive {
				graph.SetState(structure.GraphStateOnDisk) // InComplete is not a persistent state
				tasks := taskMgr.GetTaskByGraph(graph.SpaceName, graph.Name)
				for _, task := range tasks {
					if task.State == structure.TaskStateWaiting {
						logrus.Infof("waiting taskid:%v", task.ID)
						taskCanRecovery = append(taskCanRecovery, task)
					} else if task.State != structure.TaskStateComplete && task.State != structure.TaskStateLoaded && task.State != structure.TaskStateError {
						// task.SetErrMsg(fmt.Errorf("recovery task error, task state:%v", task.State).Error())
						// task.SetState(structure.TaskStateError)
						taskMgr.SetError(task, fmt.Errorf("recovery task error, task state:%v", task.State).Error())
						err := taskMgr.FinishTask(task.ID)
						if err != nil {
							logrus.Errorf("finish task error: %s", err)
						}
					}
				}
			}
		}
	}
	go func() {
		sort.Slice(taskCanRecovery, func(i, j int) bool {
			return taskCanRecovery[i].ID < taskCanRecovery[j].ID
		})
		time.Sleep(3 * time.Second)
		for _, task := range taskCanRecovery {
			_, err = Scheduler.QueueTask(task)
			if err != nil {
				logrus.Errorf("recovery task error:%v", err)
			}
			logrus.Infof("worker:%v recovery task:%v type:%v ", workerClient.Name, task.ID, task.Type)
		}
	}()
	return &resp, nil
}