inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go (308 lines of code) (raw):

// // Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package dataproxy import ( "bytes" "context" "errors" "math" "sync" "time" "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool" "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/discoverer" "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/framer" "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/logger" "github.com/panjf2000/gnet/v2" "go.uber.org/atomic" ) // variables var ( ErrInvalidGroupID = errors.New("invalid group ID") ErrInvalidURL = errors.New("invalid URL") ErrNoEndpoint = errors.New("service has no endpoints") ErrNoAvailableWorker = errors.New("no available worker") ErrInvalidMessage = errors.New("invalid message, GroupID/StreamID/Payload is empty or contains illegal characters") ) // Client is the interface of a DataProxy client type Client interface { // Send sends a message and wait for the result. Send(ctx context.Context, msg Message) error // SendAsync sends a message asynchronously, when the message is sent or timeout, the callback will be called. SendAsync(ctx context.Context, msg Message, callback Callback) // Close flushes all the message to the server and wait for the results or timeout, then close the producer. Close() } type client struct { *gnet.BuiltinEventEngine // inherited from the default event engin options *Options // config options discoverer discoverer.Discoverer // service discoverer connPool connpool.EndpointRestrictedConnPool // connection pool netClient *gnet.Client // client side network manager workers []*worker // worker to do send and receive jobs curWorkerIndex atomic.Uint64 // current worker index log logger.Logger // debug logger metrics *metrics // metrics framer framer.Framer // response framer closeOnce sync.Once // } // NewClient Creates a dataproxy-go client instance func NewClient(opts ...Option) (Client, error) { // default v1 options options := &Options{} for _, o := range opts { o(options) } err := options.ValidateAndSetDefault() if err != nil { return nil, err } // the client struct cli := &client{ options: options, log: options.Logger, } err = cli.initAll() if err != nil { cli.Close() return nil, err } return cli, nil } func (c *client) initAll() error { // the following initialization order must not be changed。 err := c.initMetrics() if err != nil { return err } err = c.initDiscoverer() if err != nil { return err } err = c.initNetClient() if err != nil { return err } err = c.initConns() if err != nil { return err } err = c.initFramer() if err != nil { return err } err = c.initWorkers() if err != nil { return err } return nil } func (c *client) initDiscoverer() error { dis, err := NewDiscoverer(c.options.URL, c.options.GroupID, c.options.UpdateInterval, c.options.Logger, c.options.Auth) if err != nil { return err } c.discoverer = dis dis.AddEventHandler(c) return nil } func (c *client) initNetClient() error { netClient, err := gnet.NewClient( c, gnet.WithLogger(c.options.Logger), gnet.WithWriteBufferCap(c.options.WriteBufferSize), gnet.WithReadBufferCap(c.options.ReadBufferSize), gnet.WithSocketSendBuffer(c.options.SocketSendBufferSize), gnet.WithSocketRecvBuffer(c.options.SocketRecvBufferSize), gnet.WithTCPKeepAlive(5*time.Minute)) if err != nil { return err } err = netClient.Start() if err != nil { return err } // save net client c.netClient = netClient return nil } func (c *client) initConns() error { epList := c.discoverer.GetEndpoints() epLen := len(epList) if epLen == 0 { return ErrNoEndpoint } endpoints := make([]string, epLen) for i := 0; i < epLen; i++ { endpoints[i] = epList[i].Addr } // minimum connection number per endpoint is 1 connsPerEndpoint := int(math.Ceil(float64(c.options.WorkerNum) * 1.2 / float64(epLen))) pool, err := connpool.NewConnPool(endpoints, connsPerEndpoint, 2048, c, c.log, c.options.MaxConnLifetime) if err != nil { return err } c.connPool = pool return nil } func (c *client) initFramer() error { fr, err := framer.NewLengthField(framer.LengthFieldCfg{ MaxFrameLen: 64 * 1024, FieldOffset: 0, FieldLength: 4, Adjustment: 0, BytesToStrip: 0, }) if err != nil { return err } c.framer = fr return nil } func (c *client) initMetrics() error { m, err := newMetrics(c.options.MetricsName, c.options.MetricsRegistry) if err != nil { return err } c.metrics = m return nil } func (c *client) initWorkers() error { c.workers = make([]*worker, 0, c.options.WorkerNum) for i := 0; i < c.options.WorkerNum; i++ { w, err := c.createWorker(i) if err != nil { return err } c.workers = append(c.workers, w) } return nil } func (c *client) Dial(addr string, ctx any) (gnet.Conn, error) { return c.netClient.DialContext("tcp", addr, ctx) } func (c *client) Send(ctx context.Context, msg Message) error { if !msg.IsValid() { c.log.Error("invalid message", ErrInvalidGroupID) return ErrInvalidMessage } worker, err := c.getWorker() if err != nil { return ErrNoAvailableWorker } return worker.send(ctx, msg) } func (c *client) SendAsync(ctx context.Context, msg Message, cb Callback) { if !msg.IsValid() { if cb != nil { cb(msg, ErrInvalidMessage) } c.log.Error("invalid message", ErrInvalidGroupID) return } worker, err := c.getWorker() if err != nil { if cb != nil { cb(msg, ErrNoAvailableWorker) } return } worker.sendAsync(ctx, msg, cb) } func (c *client) getWorker() (*worker, error) { index := c.curWorkerIndex.Load() w := c.workers[index%uint64(len(c.workers))] c.curWorkerIndex.Add(1) if w.available() { return w, nil } c.metrics.incError(workerBusy.strCode) return nil, workerBusy } func (c *client) Close() { c.closeOnce.Do(func() { if c.discoverer != nil { c.discoverer.Close() } for _, w := range c.workers { w.close() } if c.netClient != nil { _ = c.netClient.Stop() } if c.connPool != nil { c.connPool.Close() } }) } func (c *client) createWorker(index int) (*worker, error) { return newWorker(c, index, c.options) } func (c *client) getConn() (gnet.Conn, error) { return c.connPool.Get() } func (c *client) putConn(conn gnet.Conn, err error) { c.connPool.Put(conn, err) } func (c *client) OnBoot(e gnet.Engine) gnet.Action { c.log.Info("client boot") return gnet.None } func (c *client) OnShutdown(e gnet.Engine) { c.log.Info("client shutdown") } func (c *client) OnOpen(conn gnet.Conn) ([]byte, gnet.Action) { c.log.Debug("connection opened: ", conn.RemoteAddr()) return nil, gnet.None } func (c *client) OnClose(conn gnet.Conn, err error) gnet.Action { if err != nil { c.log.Warn("connection closed: ", conn.RemoteAddr(), ", err: ", err) c.metrics.incError(errConnClosedByPeer.strCode) } // delete this conn from conn pool if c.connPool != nil { c.connPool.OnConnClosed(conn, err) } if err != nil { for _, w := range c.workers { w.onConnClosed(conn, err) } } return gnet.None } func (c *client) OnTraffic(conn gnet.Conn) (action gnet.Action) { // c.log.Debug("response received") for { total := conn.InboundBuffered() if total < heartbeatRspLen { break } buf, _ := conn.Peek(total) // if it is a heartbeat response, skip it and read the next package if bytes.Equal(buf[:heartbeatRspLen], heartbeatRsp) { _, err := conn.Discard(heartbeatRspLen) if err != nil { c.metrics.incError(errConnReadFailed.getStrCode()) c.log.Error("discard connection stream failed, err", err) // read failed, close the connection return gnet.Close } c.log.Debug("heartbeat rsp receive") continue } length, payloadOffset, payloadOffsetEnd, err := c.framer.ReadFrame(buf) if errors.Is(err, framer.ErrIncompleteFrame) { break } if err != nil { c.metrics.incError(errConnReadFailed.getStrCode()) c.log.Error("invalid packet from stream connection, close it, err:", err) // read failed, close the connection return gnet.Close } frame, _ := conn.Peek(length) _, err = conn.Discard(length) if err != nil { c.metrics.incError(errConnReadFailed.getStrCode()) c.log.Error("discard connection stream failed, err", err) // read failed, close the connection return gnet.Close } // handle response c.onResponse(frame[payloadOffset:payloadOffsetEnd]) } return gnet.None } func (c *client) onResponse(frame []byte) { rsp := batchRsp{} rsp.decode(frame) index := getWorkerIndex(rsp.workerID) if index < 0 || index >= len(c.workers) { c.log.Errorf("invalid worker index from response, index=%d", index) return } c.workers[index].onRsp(&rsp) } func (c *client) OnEndpointUpdate(all, add, del discoverer.EndpointList) { c.connPool.UpdateEndpoints(all.Addresses(), add.Addresses(), del.Addresses()) }