func newClient()

in client.go [115:180]


func newClient(cfg *Config, opts ...Option) (*Client, error) {
	if !cfg.IsValid() {
		return nil, fmt.Errorf("invalid console config, cfg=%+v", cfg)
	}

	ctx := context.Background()
	options := new(Options)
	for _, opt := range opts {
		opt(options)
	}

	// Init discovery
	openAPIClient := openapi.NewClient(
		openapi.WithHTTPClient(http.Client{Timeout: time.Second * 3}),
		openapi.WithNamespace(cfg.Namespace),
		openapi.WithGroupId(cfg.GroupId),
		openapi.WithOpenAPIDomain(cfg.DomainName),
		openapi.WithOpenAPIEndpoint(cfg.Endpoint),
		openapi.WithAppKey(cfg.AppKey),
	)
	openapi.InitOpenAPIClient(openAPIClient)
	discovery.GetGroupManager().StartServerDiscovery(cfg.GroupId, cfg.AppKey)
	serverDiscover := discovery.GetDiscovery(cfg.GroupId)
	getActiveServer := func() string {
		return serverDiscover.ActiveServer()
	}

	// Init connection pool
	dialer := func() (net.Conn, error) {
		logger.Infof("SchedulerX discovery active server addr=%s", getActiveServer())
		return net.DialTimeout("tcp", getActiveServer(), time.Millisecond*500)
	}
	singleConnPool := pool.NewSingleConnPool(ctx, dialer,
		pool.WithPostDialer(remoting.Handshake),
		pool.WithAddrChangedSignalCh(serverDiscover.ResultChangedCh()))
	pool.InitConnPool(singleConnPool)
	if conn, err := singleConnPool.Get(ctx); err != nil {
		return nil, fmt.Errorf("cannot connect schedulerx server, maybe network was broken, err=%s", err.Error())
	} else {
		logger.Infof("SchedulerX server connected, remoteAddr=%s, localAddr=%s", conn.RemoteAddr(), conn.LocalAddr().String())
	}

	taskMap := tasks.GetTaskMap()
	masterpool.InitTaskMasterPool(masterpool.NewTaskMasterPool(taskMap))

	// Init actors
	actorSystem := actorcomm.GetActorSystem()
	if err = sxactor.InitActors(actorSystem); err != nil {
		return nil, fmt.Errorf("init actors faild, err=%s", err.Error())
	}

	stopChan := make(chan os.Signal, 1)
	signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM)

	// Keep heartbeat, and receive message
	// KeepHeartbeat must after init actors, so that can get actorSystemPort from actorSystem
	go remoting.KeepHeartbeat(ctx, actorSystem, cfg.AppKey, stopChan)
	go remoting.OnMsgReceived(ctx)

	return &Client{
		cfg:      cfg,
		opts:     options,
		tasks:    taskMap,
		stopChan: stopChan,
	}, nil
}