func()

in vermeer/apps/worker/service.go [60:197]


func (s *Service) Init() error {
	var err error
	//node, err := snowflake.NewNode(rand.Int63n(1023))
	//if err != nil {
	//	logrus.Errorf("new snowflake error: %s", err)
	//}
	//s.WorkerName = node.Generate().String()
	s.ComputeTaskHandler = &ComputeTaskHandler{}
	s.LoadGraphHandler = &LoadGraphTaskHandler{}
	s.SuperStepHandler = &SuperStepHandler{}
	masterPeer := common.GetConfig("master_peer").(string)
	grpcPeer := common.GetConfig("grpc_peer").(string)
	// init peer manager
	PeerMgr.Init()

	dialOptions := grpc.WithDefaultCallOptions(
		grpc.MaxCallSendMsgSize(4*1024*1024*1024),
		grpc.MaxCallRecvMsgSize(4*1024*1024*1024))

	s.masterConn, err = grpc.Dial(
		masterPeer, grpc.WithTransportCredentials(insecure.NewCredentials()), dialOptions, grpc.WithBlock(), grpc.WithIdleTimeout(0))
	if err != nil {
		logrus.Errorf("connect master error: %s", err)
		return err
	}
	s.MasterClient = pb.NewMasterClient(s.masterConn)
	ctx, cancel1 := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel1()
	resp, err := s.MasterClient.SayHelloMaster(
		ctx, &pb.HelloMasterReq{WorkerPeer: grpcPeer, Version: version.Version})
	if err != nil {
		logrus.Errorf("say hello master error: %s", err)
		return err
	}
	s.WorkerId = resp.GetWorkerId()
	s.WorkerName = resp.GetWorkerName()
	logrus.Infof("Hello World, I am worker %s", s.WorkerName)
	PeerMgr.AddPeer(s.WorkerName, s.WorkerId, grpcPeer)
	md := metadata.New(map[string]string{"worker_name": s.WorkerName})
	for _, w := range resp.GetWorkers() {
		PeerMgr.AddPeer(w.Name, w.Id, w.GrpcPeer)
		peerClient := PeerMgr.GetPeer(w.Name)
		peerClient.peerConn, err = grpc.Dial(
			w.GrpcPeer, grpc.WithTransportCredentials(insecure.NewCredentials()), dialOptions, grpc.WithBlock(), grpc.WithIdleTimeout(0))
		if err != nil {
			logrus.Fatalf("connect to peer %s error: %s", w.GrpcPeer, err)
			return err
		}
		peerClient.peerSession = pb.NewWorkerClient(peerClient.peerConn)
		ctx, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
		resp, err := peerClient.peerSession.SayHelloPeer(
			ctx, &pb.HelloPeerReq{SourceName: s.WorkerName, TargetName: w.Name, WorkerPeer: grpcPeer})
		if err != nil {
			logrus.Fatalf("say hello peer %s error: %s", w.GrpcPeer, err)
		}

		// install scatter
		ctx = metadata.NewOutgoingContext(context.Background(), md)
		stream, err := peerClient.peerSession.Scatter(ctx)
		if err != nil {
			logrus.Fatalf("create scatter stream peer: %s error: %s", w.GrpcPeer, err)
		}
		peerClient.ScatterHandler.SetClient(stream)

		// install load action
		ctx = metadata.NewOutgoingContext(context.Background(), md)
		stream2, err := peerClient.peerSession.LoadAction(ctx)
		if err != nil {
			logrus.Fatalf("create load action stream peer: %s error: %s", w.GrpcPeer, err)
		}
		peerClient.LoadActionHandler.SetClient(stream2)

		// install step end
		ctx = metadata.NewOutgoingContext(context.Background(), md)
		stream3, err := peerClient.peerSession.StepEnd(ctx)
		if err != nil {
			logrus.Fatalf("create step end stream peer: %s error: %s", w.GrpcPeer, err)
		}
		peerClient.StepEndHandler.SetClient(stream3)

		// install setting action
		ctx = metadata.NewOutgoingContext(context.Background(), md)
		stream4, err := peerClient.peerSession.SettingAction(ctx)
		if err != nil {
			logrus.Fatalf("create step end stream peer: %s error: %s", w.GrpcPeer, err)
		}
		peerClient.SettingActionHandler.SetClient(stream4)

		logrus.Infof("say hello peer %s, ok, %s", w.GrpcPeer, resp.GetStatus())
		cancel2()
	}
	logrus.Infof("say hello master ok, worker id: %d", s.WorkerId)

	ctx = metadata.NewOutgoingContext(context.Background(), md)
	s.LoadGraphHandler.grpcStream, err = s.MasterClient.LoadGraphTask(ctx)
	if err != nil {
		logrus.Errorf("create load graph task stream error: %s", err)
		return err
	}
	logrus.Infof("create load graph task stream ok")

	ctx = metadata.NewOutgoingContext(context.Background(), md)
	s.ComputeTaskHandler.grpcStream, err = s.MasterClient.ComputeTask(ctx)
	if err != nil {
		logrus.Errorf("create load graph task stream error: %s", err)
		return err
	}
	logrus.Infof("create compute task stream ok")

	ctx = metadata.NewOutgoingContext(context.Background(), md)
	stepReq := pb.SuperStepReq{}
	s.SuperStepHandler.grpcStream, err = s.MasterClient.SuperStep(ctx, &stepReq)
	if err != nil {
		logrus.Errorf("create super step stream error: %s", err)
		return err
	}
	logrus.Infof("create super step stream ok")

	SpaceMgr.Init()
	GraphMgr.Init()
	TaskMgr.Init()
	LoadGraphMgr.Init()
	ComputeTaskMgr.Init()
	AlgorithmMgr.Init()
	for _, maker := range algorithms.Algorithms {
		AlgorithmMgr.Register(maker, "built-in")
	}
	AlgorithmMgr.LoadPlugins()
	s.locker = &sync.Mutex{}
	// for _, space := range resp.Spaces {
	// 	err := createSpaceIfAbsent(space)
	// 	if err != nil {
	// 		logrus.Errorf("create space error:%v", err)
	// 		return err
	// 	}
	// }
	return nil
}