in vermeer/apps/worker/service.go [237:382]
func (s *Service) ReconnectMaster() {
s.locker.Lock()
defer s.locker.Unlock()
if s.CheckMasterAlive() {
logrus.Infof("master is alive:%v", s.masterConn.GetState())
return
}
logrus.Infof("try to reconnect master")
//初始化
SpaceMgr.Init()
GraphMgr.Init()
TaskMgr.Init()
LoadGraphMgr.Init()
ComputeTaskMgr.Init()
PeerMgr.Init()
//common.PrometheusMetrics.TaskCnt.Reset()
common.PrometheusMetrics.TaskRunningCnt.Reset()
//common.PrometheusMetrics.GraphCnt.Reset()
//common.PrometheusMetrics.GraphLoadedCnt.Reset()
masterPeer := common.GetConfig("master_peer").(string)
grpcPeer := common.GetConfig("grpc_peer").(string)
//连接master
dialOptions := grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(4*1024*1024*1024),
grpc.MaxCallRecvMsgSize(4*1024*1024*1024),
)
var err error
s.masterConn, err = grpc.Dial(
masterPeer, grpc.WithTransportCredentials(insecure.NewCredentials()), dialOptions, grpc.WithBlock(), grpc.WithIdleTimeout(0))
if err != nil {
logrus.Errorf("connect master error: %s", err)
return
}
s.MasterClient = pb.NewMasterClient(s.masterConn)
ctx, cancel1 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel1()
resp, err := s.MasterClient.SayHelloMaster(
ctx, &pb.HelloMasterReq{WorkerPeer: grpcPeer, Version: version.Version})
if err != nil {
logrus.Infof("reconnect say hello master error:%v", err)
s.masterConn = nil
return
}
s.WorkerId = resp.GetWorkerId()
s.WorkerName = resp.GetWorkerName()
logrus.Infof("Reconnect, I am worker %s", s.WorkerName)
PeerMgr.AddPeer(s.WorkerName, s.WorkerId, grpcPeer)
md := metadata.New(map[string]string{"worker_name": s.WorkerName})
for _, workerInfo := range resp.GetWorkers() {
//对已建立grpc连接的peer忽略。建立未知peer的grpc连接,并启动handler
peer := PeerMgr.GetPeer(workerInfo.GetName())
if peer != nil {
continue
}
PeerMgr.AddPeer(workerInfo.Name, workerInfo.Id, workerInfo.GrpcPeer)
peerClient := PeerMgr.GetPeer(workerInfo.Name)
peerClient.peerConn, err = grpc.Dial(
workerInfo.GrpcPeer, grpc.WithTransportCredentials(insecure.NewCredentials()), dialOptions, grpc.WithBlock(), grpc.WithIdleTimeout(0))
if err != nil {
logrus.Fatalf("connect to peer %s error: %s", workerInfo.GrpcPeer, err)
return
}
peerClient.peerSession = pb.NewWorkerClient(peerClient.peerConn)
ctx, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := peerClient.peerSession.SayHelloPeer(
ctx, &pb.HelloPeerReq{SourceName: s.WorkerName, TargetName: workerInfo.Name, WorkerPeer: grpcPeer})
if err != nil {
logrus.Fatalf("say hello peer %s error: %s", workerInfo.GrpcPeer, err)
}
// install scatter
ctx = metadata.NewOutgoingContext(context.Background(), md)
stream, err := peerClient.peerSession.Scatter(ctx)
if err != nil {
logrus.Fatalf("create scatter stream peer: %s error: %s", workerInfo.GrpcPeer, err)
}
peerClient.ScatterHandler.SetClient(stream)
// install load action
ctx = metadata.NewOutgoingContext(context.Background(), md)
stream2, err := peerClient.peerSession.LoadAction(ctx)
if err != nil {
logrus.Fatalf("create load action stream peer: %s error: %s", workerInfo.GrpcPeer, err)
}
peerClient.LoadActionHandler.SetClient(stream2)
// install step end
ctx = metadata.NewOutgoingContext(context.Background(), md)
stream3, err := peerClient.peerSession.StepEnd(ctx)
if err != nil {
logrus.Fatalf("create step end stream peer: %s error: %s", workerInfo.GrpcPeer, err)
}
peerClient.StepEndHandler.SetClient(stream3)
// install setting action
ctx = metadata.NewOutgoingContext(context.Background(), md)
stream4, err := peerClient.peerSession.SettingAction(ctx)
if err != nil {
logrus.Fatalf("create step end stream peer: %s error: %s", workerInfo.GrpcPeer, err)
}
peerClient.SettingActionHandler.SetClient(stream4)
logrus.Infof("say hello peer %s, ok, %s", workerInfo.GrpcPeer, resp.GetStatus())
cancel2()
if peerClient.ScatterHandler.mode == HandlerModeClient {
go peerClient.ScatterHandler.RecvHandler(peerClient.Name)
}
if peerClient.LoadActionHandler.mode == HandlerModeClient {
go peerClient.LoadActionHandler.RecvHandler(peerClient.Name)
}
if peerClient.StepEndHandler.mode == HandlerModeClient {
go peerClient.StepEndHandler.RecvHandler(peerClient.Name)
}
if peerClient.SettingActionHandler.mode == HandlerModeClient {
go peerClient.SettingActionHandler.RecvHandler(peerClient.Name)
}
}
ctx = metadata.NewOutgoingContext(context.Background(), md)
tempLoadGraphGrpcStream, err := s.MasterClient.LoadGraphTask(ctx)
if err != nil {
logrus.Errorf("reconnect create load graph task stream error: %s", err)
return
}
s.LoadGraphHandler.grpcStream = tempLoadGraphGrpcStream
logrus.Infof("reconnect create load graph task stream ok")
ctx = metadata.NewOutgoingContext(context.Background(), md)
tempComputeGrpcStream, err := s.MasterClient.ComputeTask(ctx)
if err != nil {
logrus.Errorf("reconnect create load graph task stream error: %s", err)
return
}
s.ComputeTaskHandler.grpcStream = tempComputeGrpcStream
logrus.Infof("reconnect create compute task stream ok")
ctx = metadata.NewOutgoingContext(context.Background(), md)
stepReq := pb.SuperStepReq{}
tempSuperStepGrpcStream, err := s.MasterClient.SuperStep(ctx, &stepReq)
if err != nil {
logrus.Errorf("reconnect create super step stream error: %s", err)
return
}
s.SuperStepHandler.grpcStream = tempSuperStepGrpcStream
logrus.Infof("reconnect create super step stream ok")
}