in vermeer/apps/master/bl/grpc_handlers.go [51:192]
func (h *ServerHandler) SayHelloMaster(ctx context.Context, req *pb.HelloMasterReq) (*pb.HelloMasterResp, error) {
h.locker.Lock()
defer h.locker.Unlock()
p, _ := peer.FromContext(ctx)
// check worker version, if not match, deny
if req.Version != version.Version {
err := fmt.Errorf("registration denied: inconsistent registration version! master version %v, worker version %v", req.Version, version.Version)
logrus.Errorf(err.Error())
return &pb.HelloMasterResp{Base: &pb.BaseResponse{ErrorCode: -1, Message: err.Error()}}, err
}
semi := strings.LastIndex(p.Addr.String(), ":")
if semi < 0 {
err := fmt.Errorf("worker grpc peer error: %s", req.WorkerPeer)
logrus.Errorf(err.Error())
return &pb.HelloMasterResp{Base: &pb.BaseResponse{ErrorCode: -1, Message: err.Error()}}, err
}
ip := p.Addr.String()[:semi]
semi = strings.LastIndex(req.GetWorkerPeer(), ":")
if semi < 0 {
err := fmt.Errorf("worker grpc peer error: %s", req.WorkerPeer)
logrus.Errorf(err.Error())
return &pb.HelloMasterResp{Base: &pb.BaseResponse{ErrorCode: -1, Message: err.Error()}}, err
}
port := req.GetWorkerPeer()[semi+1:]
workers := workerMgr.GetAllWorkers()
var existWorkerName string
for _, worker := range workers {
if worker.GrpcPeer == ip+":"+port {
existWorkerName = worker.Name
break
}
}
if existWorkerName != "" {
//如果worker已经存在,必须等待至grpc recv handler感知后,删除worker
for workerMgr.GetWorker(existWorkerName) != nil {
logrus.Infof("worker %v exist, wait one second", existWorkerName)
time.Sleep(1 * time.Second)
}
}
reqWorker, err := workerMgr.CreateWorker(ip+":"+port, ip, req.Version)
if err != nil {
logrus.Errorf("failed to create a WorkerClient, error: %s", err)
return &pb.HelloMasterResp{WorkerId: -1, WorkerName: reqWorker.Name}, err
}
_, err = workerMgr.AddWorker(reqWorker)
if err != nil {
logrus.Errorf("failed to add a WorkerClient to the WorkerManager, error: %s", err)
return &pb.HelloMasterResp{}, err
}
logrus.Infof("worker say hello name: %s, client: %s", reqWorker.Name, p.Addr.String())
resp := pb.HelloMasterResp{
WorkerId: reqWorker.Id,
WorkerName: reqWorker.Name,
}
for _, v := range workers {
if v.GrpcPeer == ip+":"+port {
logrus.Debugf("workerMgr has a duplicate address, ignore it")
continue
}
workerInfo := pb.WorkerInfo{
Name: v.Name,
Id: v.Id,
GrpcPeer: v.GrpcPeer,
}
resp.Workers = append(resp.Workers, &workerInfo)
}
// connect to client
workerClient := workerMgr.GetWorker(reqWorker.Name)
dialOptions := grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(1*1024*1024*1024),
grpc.MaxCallRecvMsgSize(1*1024*1024*1024))
workerClient.Connection, err = grpc.Dial(
ip+":"+port, grpc.WithTransportCredentials(insecure.NewCredentials()), dialOptions, grpc.WithBlock(), grpc.WithIdleTimeout(0))
if err != nil {
logrus.Errorf("connect worker error: %s", err)
return &resp, err
}
spaces := structure.SpaceManager.GetAllSpace()
for _, space := range spaces {
resp.Spaces = append(resp.Spaces, space.Name())
}
workerClient.Session = pb.NewWorkerClient(workerClient.Connection)
initWorkerClient(workerClient)
graphs := graphMgr.GetGraphsByWorker(reqWorker.Name)
taskCanRecovery := make([]*structure.TaskInfo, 0)
for _, graph := range graphs {
if graph.State == structure.GraphStateInComplete {
workerAllAlive := true
for _, worker := range graph.Workers {
if !workerMgr.CheckWorkerAlive(worker.Name) {
workerAllAlive = false
break
}
}
if workerAllAlive {
graph.SetState(structure.GraphStateOnDisk) // InComplete is not a persistent state
tasks := taskMgr.GetTaskByGraph(graph.SpaceName, graph.Name)
for _, task := range tasks {
if task.State == structure.TaskStateWaiting {
logrus.Infof("waiting taskid:%v", task.ID)
taskCanRecovery = append(taskCanRecovery, task)
} else if task.State != structure.TaskStateComplete && task.State != structure.TaskStateLoaded && task.State != structure.TaskStateError {
// task.SetErrMsg(fmt.Errorf("recovery task error, task state:%v", task.State).Error())
// task.SetState(structure.TaskStateError)
taskMgr.SetError(task, fmt.Errorf("recovery task error, task state:%v", task.State).Error())
err := taskMgr.FinishTask(task.ID)
if err != nil {
logrus.Errorf("finish task error: %s", err)
}
}
}
}
}
}
go func() {
sort.Slice(taskCanRecovery, func(i, j int) bool {
return taskCanRecovery[i].ID < taskCanRecovery[j].ID
})
time.Sleep(3 * time.Second)
for _, task := range taskCanRecovery {
_, err = Scheduler.QueueTask(task)
if err != nil {
logrus.Errorf("recovery task error:%v", err)
}
logrus.Infof("worker:%v recovery task:%v type:%v ", workerClient.Name, task.ID, task.Type)
}
}()
return &resp, nil
}