in client.go [115:180]
func newClient(cfg *Config, opts ...Option) (*Client, error) {
if !cfg.IsValid() {
return nil, fmt.Errorf("invalid console config, cfg=%+v", cfg)
}
ctx := context.Background()
options := new(Options)
for _, opt := range opts {
opt(options)
}
// Init discovery
openAPIClient := openapi.NewClient(
openapi.WithHTTPClient(http.Client{Timeout: time.Second * 3}),
openapi.WithNamespace(cfg.Namespace),
openapi.WithGroupId(cfg.GroupId),
openapi.WithOpenAPIDomain(cfg.DomainName),
openapi.WithOpenAPIEndpoint(cfg.Endpoint),
openapi.WithAppKey(cfg.AppKey),
)
openapi.InitOpenAPIClient(openAPIClient)
discovery.GetGroupManager().StartServerDiscovery(cfg.GroupId, cfg.AppKey)
serverDiscover := discovery.GetDiscovery(cfg.GroupId)
getActiveServer := func() string {
return serverDiscover.ActiveServer()
}
// Init connection pool
dialer := func() (net.Conn, error) {
logger.Infof("SchedulerX discovery active server addr=%s", getActiveServer())
return net.DialTimeout("tcp", getActiveServer(), time.Millisecond*500)
}
singleConnPool := pool.NewSingleConnPool(ctx, dialer,
pool.WithPostDialer(remoting.Handshake),
pool.WithAddrChangedSignalCh(serverDiscover.ResultChangedCh()))
pool.InitConnPool(singleConnPool)
if conn, err := singleConnPool.Get(ctx); err != nil {
return nil, fmt.Errorf("cannot connect schedulerx server, maybe network was broken, err=%s", err.Error())
} else {
logger.Infof("SchedulerX server connected, remoteAddr=%s, localAddr=%s", conn.RemoteAddr(), conn.LocalAddr().String())
}
taskMap := tasks.GetTaskMap()
masterpool.InitTaskMasterPool(masterpool.NewTaskMasterPool(taskMap))
// Init actors
actorSystem := actorcomm.GetActorSystem()
if err = sxactor.InitActors(actorSystem); err != nil {
return nil, fmt.Errorf("init actors faild, err=%s", err.Error())
}
stopChan := make(chan os.Signal, 1)
signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM)
// Keep heartbeat, and receive message
// KeepHeartbeat must after init actors, so that can get actorSystemPort from actorSystem
go remoting.KeepHeartbeat(ctx, actorSystem, cfg.AppKey, stopChan)
go remoting.OnMsgReceived(ctx)
return &Client{
cfg: cfg,
opts: options,
tasks: taskMap,
stopChan: stopChan,
}, nil
}