client.go (132 lines of code) (raw):

/* * Copyright (c) 2023 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package schedulerx import ( "context" "fmt" "net" "net/http" "os" "os/signal" "sync" "syscall" "time" "github.com/alibaba/schedulerx-worker-go/config" sxactor "github.com/alibaba/schedulerx-worker-go/internal/actor" actorcomm "github.com/alibaba/schedulerx-worker-go/internal/actor/common" "github.com/alibaba/schedulerx-worker-go/internal/discovery" "github.com/alibaba/schedulerx-worker-go/internal/masterpool" "github.com/alibaba/schedulerx-worker-go/internal/openapi" "github.com/alibaba/schedulerx-worker-go/internal/remoting" "github.com/alibaba/schedulerx-worker-go/internal/remoting/pool" "github.com/alibaba/schedulerx-worker-go/internal/tasks" "github.com/alibaba/schedulerx-worker-go/logger" "github.com/alibaba/schedulerx-worker-go/processor" "github.com/alibaba/schedulerx-worker-go/tracer" ) var ( client *Client err error once sync.Once ) type Client struct { cfg *Config opts *Options tasks *tasks.TaskMap stopChan chan os.Signal } func (c *Client) RegisterTask(name string, task processor.Processor) { c.tasks.Register(name, task) } // Shutdown gracefully stop the client func (c *Client) Shutdown() { c.stopChan <- syscall.SIGINT time.Sleep(time.Second * 5) } type Config struct { DomainName string `json:"DomainName"` Endpoint string `json:"Endpoint"` Namespace string `json:"Namespace"` // GroupId may be exited multiple, separated by comma, such as "group1,group2" GroupId string `json:"GroupId"` // AppKey may be exited multiple, separated by comma, such as "appKey1,appKey2", // appKey and groupId are in one-to-one correspondence AppKey string `json:"AppKey"` } func (c *Config) IsValid() bool { if c == nil { return false } if c.Namespace == "" || c.GroupId == "" || c.AppKey == "" { return false } if c.Endpoint == "" && c.DomainName == "" { return false } return true } type Options struct{} type Option func(*Options) func WithWorkerConfig(cfg *config.WorkerConfig) Option { return func(opt *Options) { config.InitWorkerConfig(cfg) } } func WithTracer(t tracer.Tracer) Option { return func(opt *Options) { tracer.InitTracer(t) } } func GetClient(cfg *Config, opts ...Option) (*Client, error) { once.Do(func() { client, err = newClient(cfg, opts...) }) return client, err } func newClient(cfg *Config, opts ...Option) (*Client, error) { if !cfg.IsValid() { return nil, fmt.Errorf("invalid console config, cfg=%+v", cfg) } ctx := context.Background() options := new(Options) for _, opt := range opts { opt(options) } // Init discovery openAPIClient := openapi.NewClient( openapi.WithHTTPClient(http.Client{Timeout: time.Second * 3}), openapi.WithNamespace(cfg.Namespace), openapi.WithGroupId(cfg.GroupId), openapi.WithOpenAPIDomain(cfg.DomainName), openapi.WithOpenAPIEndpoint(cfg.Endpoint), openapi.WithAppKey(cfg.AppKey), ) openapi.InitOpenAPIClient(openAPIClient) discovery.GetGroupManager().StartServerDiscovery(cfg.GroupId, cfg.AppKey) serverDiscover := discovery.GetDiscovery(cfg.GroupId) getActiveServer := func() string { return serverDiscover.ActiveServer() } // Init connection pool dialer := func() (net.Conn, error) { logger.Infof("SchedulerX discovery active server addr=%s", getActiveServer()) return net.DialTimeout("tcp", getActiveServer(), time.Millisecond*500) } singleConnPool := pool.NewSingleConnPool(ctx, dialer, pool.WithPostDialer(remoting.Handshake), pool.WithAddrChangedSignalCh(serverDiscover.ResultChangedCh())) pool.InitConnPool(singleConnPool) if conn, err := singleConnPool.Get(ctx); err != nil { return nil, fmt.Errorf("cannot connect schedulerx server, maybe network was broken, err=%s", err.Error()) } else { logger.Infof("SchedulerX server connected, remoteAddr=%s, localAddr=%s", conn.RemoteAddr(), conn.LocalAddr().String()) } taskMap := tasks.GetTaskMap() masterpool.InitTaskMasterPool(masterpool.NewTaskMasterPool(taskMap)) // Init actors actorSystem := actorcomm.GetActorSystem() if err = sxactor.InitActors(actorSystem); err != nil { return nil, fmt.Errorf("init actors faild, err=%s", err.Error()) } stopChan := make(chan os.Signal, 1) signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM) // Keep heartbeat, and receive message // KeepHeartbeat must after init actors, so that can get actorSystemPort from actorSystem go remoting.KeepHeartbeat(ctx, actorSystem, cfg.AppKey, stopChan) go remoting.OnMsgReceived(ctx) return &Client{ cfg: cfg, opts: options, tasks: taskMap, stopChan: stopChan, }, nil }