func()

in vermeer/apps/worker/service.go [237:382]


func (s *Service) ReconnectMaster() {
	s.locker.Lock()
	defer s.locker.Unlock()
	if s.CheckMasterAlive() {
		logrus.Infof("master is alive:%v", s.masterConn.GetState())
		return
	}
	logrus.Infof("try to reconnect master")
	//初始化
	SpaceMgr.Init()
	GraphMgr.Init()
	TaskMgr.Init()
	LoadGraphMgr.Init()
	ComputeTaskMgr.Init()
	PeerMgr.Init()
	//common.PrometheusMetrics.TaskCnt.Reset()
	common.PrometheusMetrics.TaskRunningCnt.Reset()
	//common.PrometheusMetrics.GraphCnt.Reset()
	//common.PrometheusMetrics.GraphLoadedCnt.Reset()
	masterPeer := common.GetConfig("master_peer").(string)
	grpcPeer := common.GetConfig("grpc_peer").(string)
	//连接master
	dialOptions := grpc.WithDefaultCallOptions(
		grpc.MaxCallSendMsgSize(4*1024*1024*1024),
		grpc.MaxCallRecvMsgSize(4*1024*1024*1024),
	)
	var err error
	s.masterConn, err = grpc.Dial(
		masterPeer, grpc.WithTransportCredentials(insecure.NewCredentials()), dialOptions, grpc.WithBlock(), grpc.WithIdleTimeout(0))
	if err != nil {
		logrus.Errorf("connect master error: %s", err)
		return
	}
	s.MasterClient = pb.NewMasterClient(s.masterConn)
	ctx, cancel1 := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel1()
	resp, err := s.MasterClient.SayHelloMaster(
		ctx, &pb.HelloMasterReq{WorkerPeer: grpcPeer, Version: version.Version})
	if err != nil {
		logrus.Infof("reconnect say hello master error:%v", err)
		s.masterConn = nil
		return
	}
	s.WorkerId = resp.GetWorkerId()
	s.WorkerName = resp.GetWorkerName()
	logrus.Infof("Reconnect, I am worker %s", s.WorkerName)
	PeerMgr.AddPeer(s.WorkerName, s.WorkerId, grpcPeer)
	md := metadata.New(map[string]string{"worker_name": s.WorkerName})
	for _, workerInfo := range resp.GetWorkers() {
		//对已建立grpc连接的peer忽略。建立未知peer的grpc连接,并启动handler
		peer := PeerMgr.GetPeer(workerInfo.GetName())
		if peer != nil {
			continue
		}
		PeerMgr.AddPeer(workerInfo.Name, workerInfo.Id, workerInfo.GrpcPeer)
		peerClient := PeerMgr.GetPeer(workerInfo.Name)
		peerClient.peerConn, err = grpc.Dial(
			workerInfo.GrpcPeer, grpc.WithTransportCredentials(insecure.NewCredentials()), dialOptions, grpc.WithBlock(), grpc.WithIdleTimeout(0))
		if err != nil {
			logrus.Fatalf("connect to peer %s error: %s", workerInfo.GrpcPeer, err)
			return
		}
		peerClient.peerSession = pb.NewWorkerClient(peerClient.peerConn)
		ctx, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
		resp, err := peerClient.peerSession.SayHelloPeer(
			ctx, &pb.HelloPeerReq{SourceName: s.WorkerName, TargetName: workerInfo.Name, WorkerPeer: grpcPeer})
		if err != nil {
			logrus.Fatalf("say hello peer %s error: %s", workerInfo.GrpcPeer, err)
		}

		// install scatter
		ctx = metadata.NewOutgoingContext(context.Background(), md)
		stream, err := peerClient.peerSession.Scatter(ctx)
		if err != nil {
			logrus.Fatalf("create scatter stream peer: %s error: %s", workerInfo.GrpcPeer, err)
		}
		peerClient.ScatterHandler.SetClient(stream)

		// install load action
		ctx = metadata.NewOutgoingContext(context.Background(), md)
		stream2, err := peerClient.peerSession.LoadAction(ctx)
		if err != nil {
			logrus.Fatalf("create load action stream peer: %s error: %s", workerInfo.GrpcPeer, err)
		}
		peerClient.LoadActionHandler.SetClient(stream2)

		// install step end
		ctx = metadata.NewOutgoingContext(context.Background(), md)
		stream3, err := peerClient.peerSession.StepEnd(ctx)
		if err != nil {
			logrus.Fatalf("create step end stream peer: %s error: %s", workerInfo.GrpcPeer, err)
		}
		peerClient.StepEndHandler.SetClient(stream3)

		// install setting action
		ctx = metadata.NewOutgoingContext(context.Background(), md)
		stream4, err := peerClient.peerSession.SettingAction(ctx)
		if err != nil {
			logrus.Fatalf("create step end stream peer: %s error: %s", workerInfo.GrpcPeer, err)
		}
		peerClient.SettingActionHandler.SetClient(stream4)

		logrus.Infof("say hello peer %s, ok, %s", workerInfo.GrpcPeer, resp.GetStatus())
		cancel2()
		if peerClient.ScatterHandler.mode == HandlerModeClient {
			go peerClient.ScatterHandler.RecvHandler(peerClient.Name)
		}
		if peerClient.LoadActionHandler.mode == HandlerModeClient {
			go peerClient.LoadActionHandler.RecvHandler(peerClient.Name)
		}
		if peerClient.StepEndHandler.mode == HandlerModeClient {
			go peerClient.StepEndHandler.RecvHandler(peerClient.Name)
		}
		if peerClient.SettingActionHandler.mode == HandlerModeClient {
			go peerClient.SettingActionHandler.RecvHandler(peerClient.Name)
		}
	}

	ctx = metadata.NewOutgoingContext(context.Background(), md)
	tempLoadGraphGrpcStream, err := s.MasterClient.LoadGraphTask(ctx)
	if err != nil {
		logrus.Errorf("reconnect create load graph task stream error: %s", err)
		return
	}
	s.LoadGraphHandler.grpcStream = tempLoadGraphGrpcStream
	logrus.Infof("reconnect create load graph task stream ok")

	ctx = metadata.NewOutgoingContext(context.Background(), md)
	tempComputeGrpcStream, err := s.MasterClient.ComputeTask(ctx)
	if err != nil {
		logrus.Errorf("reconnect create load graph task stream error: %s", err)
		return
	}
	s.ComputeTaskHandler.grpcStream = tempComputeGrpcStream
	logrus.Infof("reconnect create compute task stream ok")

	ctx = metadata.NewOutgoingContext(context.Background(), md)
	stepReq := pb.SuperStepReq{}
	tempSuperStepGrpcStream, err := s.MasterClient.SuperStep(ctx, &stepReq)
	if err != nil {
		logrus.Errorf("reconnect create super step stream error: %s", err)
		return
	}
	s.SuperStepHandler.grpcStream = tempSuperStepGrpcStream
	logrus.Infof("reconnect create super step stream ok")
}