in vermeer/apps/worker/service.go [60:197]
func (s *Service) Init() error {
var err error
//node, err := snowflake.NewNode(rand.Int63n(1023))
//if err != nil {
// logrus.Errorf("new snowflake error: %s", err)
//}
//s.WorkerName = node.Generate().String()
s.ComputeTaskHandler = &ComputeTaskHandler{}
s.LoadGraphHandler = &LoadGraphTaskHandler{}
s.SuperStepHandler = &SuperStepHandler{}
masterPeer := common.GetConfig("master_peer").(string)
grpcPeer := common.GetConfig("grpc_peer").(string)
// init peer manager
PeerMgr.Init()
dialOptions := grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(4*1024*1024*1024),
grpc.MaxCallRecvMsgSize(4*1024*1024*1024))
s.masterConn, err = grpc.Dial(
masterPeer, grpc.WithTransportCredentials(insecure.NewCredentials()), dialOptions, grpc.WithBlock(), grpc.WithIdleTimeout(0))
if err != nil {
logrus.Errorf("connect master error: %s", err)
return err
}
s.MasterClient = pb.NewMasterClient(s.masterConn)
ctx, cancel1 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel1()
resp, err := s.MasterClient.SayHelloMaster(
ctx, &pb.HelloMasterReq{WorkerPeer: grpcPeer, Version: version.Version})
if err != nil {
logrus.Errorf("say hello master error: %s", err)
return err
}
s.WorkerId = resp.GetWorkerId()
s.WorkerName = resp.GetWorkerName()
logrus.Infof("Hello World, I am worker %s", s.WorkerName)
PeerMgr.AddPeer(s.WorkerName, s.WorkerId, grpcPeer)
md := metadata.New(map[string]string{"worker_name": s.WorkerName})
for _, w := range resp.GetWorkers() {
PeerMgr.AddPeer(w.Name, w.Id, w.GrpcPeer)
peerClient := PeerMgr.GetPeer(w.Name)
peerClient.peerConn, err = grpc.Dial(
w.GrpcPeer, grpc.WithTransportCredentials(insecure.NewCredentials()), dialOptions, grpc.WithBlock(), grpc.WithIdleTimeout(0))
if err != nil {
logrus.Fatalf("connect to peer %s error: %s", w.GrpcPeer, err)
return err
}
peerClient.peerSession = pb.NewWorkerClient(peerClient.peerConn)
ctx, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := peerClient.peerSession.SayHelloPeer(
ctx, &pb.HelloPeerReq{SourceName: s.WorkerName, TargetName: w.Name, WorkerPeer: grpcPeer})
if err != nil {
logrus.Fatalf("say hello peer %s error: %s", w.GrpcPeer, err)
}
// install scatter
ctx = metadata.NewOutgoingContext(context.Background(), md)
stream, err := peerClient.peerSession.Scatter(ctx)
if err != nil {
logrus.Fatalf("create scatter stream peer: %s error: %s", w.GrpcPeer, err)
}
peerClient.ScatterHandler.SetClient(stream)
// install load action
ctx = metadata.NewOutgoingContext(context.Background(), md)
stream2, err := peerClient.peerSession.LoadAction(ctx)
if err != nil {
logrus.Fatalf("create load action stream peer: %s error: %s", w.GrpcPeer, err)
}
peerClient.LoadActionHandler.SetClient(stream2)
// install step end
ctx = metadata.NewOutgoingContext(context.Background(), md)
stream3, err := peerClient.peerSession.StepEnd(ctx)
if err != nil {
logrus.Fatalf("create step end stream peer: %s error: %s", w.GrpcPeer, err)
}
peerClient.StepEndHandler.SetClient(stream3)
// install setting action
ctx = metadata.NewOutgoingContext(context.Background(), md)
stream4, err := peerClient.peerSession.SettingAction(ctx)
if err != nil {
logrus.Fatalf("create step end stream peer: %s error: %s", w.GrpcPeer, err)
}
peerClient.SettingActionHandler.SetClient(stream4)
logrus.Infof("say hello peer %s, ok, %s", w.GrpcPeer, resp.GetStatus())
cancel2()
}
logrus.Infof("say hello master ok, worker id: %d", s.WorkerId)
ctx = metadata.NewOutgoingContext(context.Background(), md)
s.LoadGraphHandler.grpcStream, err = s.MasterClient.LoadGraphTask(ctx)
if err != nil {
logrus.Errorf("create load graph task stream error: %s", err)
return err
}
logrus.Infof("create load graph task stream ok")
ctx = metadata.NewOutgoingContext(context.Background(), md)
s.ComputeTaskHandler.grpcStream, err = s.MasterClient.ComputeTask(ctx)
if err != nil {
logrus.Errorf("create load graph task stream error: %s", err)
return err
}
logrus.Infof("create compute task stream ok")
ctx = metadata.NewOutgoingContext(context.Background(), md)
stepReq := pb.SuperStepReq{}
s.SuperStepHandler.grpcStream, err = s.MasterClient.SuperStep(ctx, &stepReq)
if err != nil {
logrus.Errorf("create super step stream error: %s", err)
return err
}
logrus.Infof("create super step stream ok")
SpaceMgr.Init()
GraphMgr.Init()
TaskMgr.Init()
LoadGraphMgr.Init()
ComputeTaskMgr.Init()
AlgorithmMgr.Init()
for _, maker := range algorithms.Algorithms {
AlgorithmMgr.Register(maker, "built-in")
}
AlgorithmMgr.LoadPlugins()
s.locker = &sync.Mutex{}
// for _, space := range resp.Spaces {
// err := createSpaceIfAbsent(space)
// if err != nil {
// logrus.Errorf("create space error:%v", err)
// return err
// }
// }
return nil
}