gdbclient/internal/pool/pool.go (391 lines of code) (raw):
/*
* (C) 2019-present Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*/
/**
* @author : Liu Jianping
* @date : 2019/11/29
*/
package pool
import (
"errors"
"fmt"
"github.com/aliyun/alibabacloud-gdb-go-sdk/gdbclient/internal"
"go.uber.org/zap"
"math"
"os"
"strings"
"sync"
"sync/atomic"
"time"
)
var (
errConnClosed = errors.New("GDB: connection closed")
errOverQueue = errors.New("GDB: request queue is full, overhead concurrent")
errDuplicateId = errors.New("GDB: pending duplicate request id to server")
errGetConnTimeout = errors.New("GDB: get connection timeout")
errPoolClosed = errors.New("GDB: connection pool closed")
)
type Options struct {
Dialer func(*Options) (*ConnWebSocket, error)
GdbUrl string
Username string
Password string
PoolSize int
PoolTimeout time.Duration
AliveCheckInterval time.Duration
MaxConnAge time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
PingInterval time.Duration
MaxInProcessPerConn int
MaxSimultaneousUsagePerConn int
}
type pNotifier func() bool
type pReleaseConn func(socket *ConnWebSocket)
type ConnPool struct {
opt *Options
dialErrorsNum uint32 // atomic
lastDialErrorMu sync.RWMutex
lastDialError error
connsMu sync.RWMutex
conns []*ConnWebSocket
poolSize int
hasAvailableConn chan struct{}
maxSimultaneousUsagePerConn int
_closed uint32 // atomic
_opening int32 // atomic
closedCh chan struct{}
checkCh chan struct{}
}
func NewConnPool(opt *Options) *ConnPool {
if os.Getenv("GO_CLIENT_TEST_URL") != "" {
opt.GdbUrl = os.Getenv("GO_CLIENT_TEST_URL")
internal.Logger.Info("GDB CLIENT IN TEST MODE")
}
p := &ConnPool{
opt: opt,
conns: make([]*ConnWebSocket, 0, opt.PoolSize),
poolSize: opt.PoolSize,
_closed: 0,
_opening: 0,
closedCh: make(chan struct{}),
checkCh: make(chan struct{}),
hasAvailableConn: make(chan struct{}),
maxSimultaneousUsagePerConn: opt.MaxSimultaneousUsagePerConn,
}
p.addConns()
if opt.AliveCheckInterval > 0 {
go p.checker(p.opt.AliveCheckInterval)
}
internal.Logger.Info("create pool", zap.Int("size", p.poolSize),
zap.Duration("get timeout", opt.PoolTimeout), zap.Duration("alive freq", opt.AliveCheckInterval),
zap.Duration("conn max age", opt.MaxConnAge))
return p
}
func (p *ConnPool) addConns() {
if atomic.LoadInt32(&p._opening) > 0 || p.closed() {
internal.Logger.Debug("pool is opening or closed")
return
}
if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.poolSize) {
internal.Logger.Debug("dial con over number")
return
}
internal.Logger.Debug("new conn async", zap.Time("time", time.Now()), zap.Int("current", p.Size()), zap.Int("target", p.poolSize))
for i := p.Size(); i < p.poolSize; i++ {
go p.newConn()
}
}
func (p *ConnPool) newConn() {
defer atomic.AddInt32(&p._opening, -1)
if atomic.AddInt32(&p._opening, 1) > int32(p.poolSize) {
return
}
cn, err := p.dialConn()
if err != nil {
internal.Logger.Error("dialer connect", zap.Time("time", time.Now()), zap.Error(err))
return
}
p.connsMu.Lock()
if !p.closed() && len(p.conns) <= p.poolSize {
cn.setNotifier(p.poolNotifier)
cn.setReleaseConn(p.Put)
p.conns = append([]*ConnWebSocket{cn}, p.conns...)
cn = nil
}
p.connsMu.Unlock()
if cn != nil {
internal.Logger.Debug("release conn as pool full", zap.Time("time", time.Now()), zap.Stringer("con", cn))
cn.Close()
} else {
p.announceAvailableConn()
}
}
func (p *ConnPool) dialConn() (*ConnWebSocket, error) {
if p.closed() {
return nil, errPoolClosed
}
if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
return nil, p.getLastDialError()
}
cn, err := p.opt.Dialer(p.opt)
if err != nil {
p.setLastDialError(err)
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
go p.tryDial()
}
return nil, err
}
return cn, nil
}
func (p *ConnPool) tryDial() {
for {
if p.closed() {
internal.Logger.Debug("try routine gone as pool closed")
return
}
conn, err := p.opt.Dialer(p.opt)
if err != nil {
internal.Logger.Info("try dial conn", zap.String("host", p.opt.GdbUrl), zap.Error(err))
p.setLastDialError(err)
time.Sleep(time.Second)
continue
}
internal.Logger.Info("try to dial server success", zap.Time("time", time.Now()))
atomic.StoreUint32(&p.dialErrorsNum, 0)
conn.Close()
// add conn to pool as connection recover
p.addConns()
return
}
}
func (p *ConnPool) Get() (*ConnWebSocket, error) {
if p.closed() {
return nil, errPoolClosed
}
return p.borrowConn(p.opt.PoolTimeout)
}
func (p *ConnPool) Put(cn *ConnWebSocket) {
if p.closed() {
internal.Logger.Error("put conn", zap.Error(errPoolClosed))
return
}
p.returnConn(cn)
}
// Size returns total number of connections.
func (p *ConnPool) Size() int {
p.connsMu.RLock()
n := len(p.conns)
p.connsMu.RUnlock()
return n
}
// close connection pool
func (p *ConnPool) Close() {
if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
return
}
internal.Logger.Info("close pool", zap.Int("size", p.Size()))
close(p.closedCh)
p.connsMu.Lock()
for _, cn := range p.conns {
p.closeConn(cn)
}
p.conns = nil
p.connsMu.Unlock()
}
func (p *ConnPool) String() string {
var consStrs []string
p.connsMu.RLock()
for _, cn := range p.conns {
consStrs = append(consStrs, "{"+cn.String()+"}")
}
connLen := len(p.conns)
p.connsMu.RUnlock()
errorStr := "{}"
if atomic.LoadUint32(&p.dialErrorsNum) > 0 {
errorStr = fmt.Sprintf("{errNum: %d, errStr: %s}", p.dialErrorsNum, p.getLastDialError().Error())
}
return fmt.Sprintf("pool<%p> size %d, opening %d, closed %t, errors: %s, conns: [%s]",
p, connLen, p._opening, p.closed(), errorStr, strings.Join(consStrs, ","))
}
func (p *ConnPool) setLastDialError(err error) {
p.lastDialErrorMu.Lock()
p.lastDialError = err
p.lastDialErrorMu.Unlock()
}
func (p *ConnPool) getLastDialError() error {
p.lastDialErrorMu.RLock()
err := p.lastDialError
p.lastDialErrorMu.RUnlock()
return err
}
func (p *ConnPool) closed() bool {
return atomic.LoadUint32(&p._closed) == 1
}
func (p *ConnPool) awaitAvailableConn(timeout time.Duration) bool {
select {
case <-time.After(timeout):
return false
case <-p.hasAvailableConn:
return true
}
}
func (p *ConnPool) announceAvailableConn() {
select {
case p.hasAvailableConn <- struct{}{}:
default:
}
}
func (p *ConnPool) removeConn(cn *ConnWebSocket) {
p.connsMu.Lock()
for i, c := range p.conns {
if c == cn {
p.conns = append(p.conns[:i], p.conns[i+1:]...)
break
}
}
p.connsMu.Unlock()
}
func (p *ConnPool) returnConn(conn *ConnWebSocket) {
atomic.AddInt32(&conn.borrowed, -1)
internal.Logger.Debug("return conn", zapPtr(conn), zap.Time("time", time.Now()))
if conn.brokenOrClosed() {
internal.Logger.Debug("return broken conn", zap.Time("time", time.Now()), zap.Stringer("cn", conn))
p.removeConn(conn)
conn.Close()
// active to dial a new connection to replace this conn
p.addConns()
} else {
p.announceAvailableConn()
}
}
func (p *ConnPool) borrowConn(timeout time.Duration) (*ConnWebSocket, error) {
conn := p.selectLeastUsed()
if conn == nil {
internal.Logger.Debug("borrow conn nil", zap.Int("poolSize", p.Size()))
return p.waitForConn(timeout)
}
for {
inFlight := atomic.LoadInt32(&conn.borrowed)
available := conn.availableInProcess()
if inFlight >= int32(p.maxSimultaneousUsagePerConn) && available == 0 {
internal.Logger.Debug("wait conn", zapPtr(conn),
zap.Int32("flight", conn.borrowed), zap.Int32("availableInProcess", available))
return p.waitForConn(timeout)
}
if atomic.CompareAndSwapInt32(&conn.borrowed, inFlight, inFlight+1) {
internal.Logger.Debug("borrowed conn", zapPtr(conn), zap.Time("time", time.Now()),
zap.Int32("flight", conn.borrowed), zap.Int32("availableInProcess", available))
return conn, nil
}
}
}
func (p *ConnPool) waitForConn(timeout time.Duration) (*ConnWebSocket, error) {
endtime := time.Now().Add(timeout)
for remaining := timeout; remaining > 0; remaining = endtime.Sub(time.Now()) {
internal.Logger.Debug("wait conn", zap.Time("now", time.Now()), zap.Duration("timeout", remaining))
ok := p.awaitAvailableConn(remaining)
if !ok {
internal.Logger.Debug("wait conn timeout", zap.Time("time", time.Now()))
return nil, errGetConnTimeout
}
if p.closed() {
internal.Logger.Debug("wait conn failed as pool closed")
return nil, errPoolClosed
}
conn := p.selectLeastUsed()
for conn != nil {
inFlight := atomic.LoadInt32(&conn.borrowed)
available := conn.availableInProcess()
// FIXME: connection available
// break to wait again if inFlight >= available in Java SDK
// why do set to wait again if connection available, so typo it now
if available == 0 {
internal.Logger.Info("wait conn may timeout", zapPtr(conn),
zap.Int32("inFlight", inFlight), zap.Int32("availableInProcess", available))
break
}
if atomic.CompareAndSwapInt32(&conn.borrowed, inFlight, inFlight+1) {
return conn, nil
}
}
}
return nil, errGetConnTimeout
}
func (p *ConnPool) selectLeastUsed() *ConnWebSocket {
minInFlight := int32(math.MaxInt32)
var leastBusy *ConnWebSocket
p.connsMu.RLock()
for _, cn := range p.conns {
inFlight := atomic.LoadInt32(&cn.borrowed)
if !cn.brokenOrClosed() && inFlight < minInFlight {
minInFlight = inFlight
leastBusy = cn
}
}
p.connsMu.RUnlock()
return leastBusy
}
func (p *ConnPool) checker(frequency time.Duration) {
ticker := time.NewTicker(frequency)
defer ticker.Stop()
var mFreq uint64 = 0
for {
select {
case <-ticker.C:
p.doCheck()
// print pool status to info log
if mFreq%5 == 0 {
internal.Logger.Info("status", zap.Time("time", time.Now()), zap.Stringer("pool", p))
}
mFreq++
case <-p.checkCh:
p.doCheck()
case <-p.closedCh:
return
}
}
}
func (p *ConnPool) poolNotifier() bool {
if p.closed() {
return false
}
select {
case p.checkCh <- struct{}{}:
return true
default:
return false
}
}
func (p *ConnPool) doCheck() {
// It is possible that ticker and closedCh arrive together,
// and select pseudo-randomly pick ticker case, we double
// check here to prevent being executed after closed.
if p.closed() {
return
}
count := p.reapStaleConns()
if count > 0 {
internal.Logger.Debug("reaper stale conns", zap.Time("time", time.Now()), zap.Int("count", count))
p.addConns()
}
}
func (p *ConnPool) reapStaleConns() int {
brokenConns := make([]*ConnWebSocket, 0)
p.connsMu.Lock()
restart:
for i, cn := range p.conns {
if p.isStaleConns(cn) {
brokenConns = append(brokenConns, cn)
p.conns = append(p.conns[:i], p.conns[i+1:]...)
goto restart
}
}
brokenConsLen := p.poolSize - len(p.conns)
p.connsMu.Unlock()
for _, cn := range brokenConns {
internal.Logger.Debug("reap stale conn", zap.Time("time", time.Now()), zap.Stringer("str", cn))
p.closeConn(cn)
}
return brokenConsLen
}
func (p *ConnPool) isStaleConns(cn *ConnWebSocket) bool {
if cn.brokenOrClosed() {
return true
}
// check max age bellow
if p.opt.MaxConnAge == 0 {
return false
}
// connection in use ??
if (atomic.LoadInt32(&cn.borrowed) != 0) || (atomic.LoadInt32(&cn.pendingSize) != 0) {
return false
}
// age old enough to release
if time.Now().Sub(cn.CreatedAt()) > p.opt.MaxConnAge {
return true
}
return false
}
func (p *ConnPool) closeConn(cn *ConnWebSocket) {
cn.Close()
}