gdbclient/internal/pool/conn.go (304 lines of code) (raw):
/*
* (C) 2019-present Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*/
/**
* @author : Liu Jianping
* @date : 2019/11/25
*/
package pool
import (
"encoding/json"
"fmt"
"math"
"net"
"net/http"
"reflect"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/aliyun/alibabacloud-gdb-go-sdk/gdbclient/graph"
"github.com/aliyun/alibabacloud-gdb-go-sdk/gdbclient/internal"
"github.com/aliyun/alibabacloud-gdb-go-sdk/gdbclient/internal/graphsonv3"
"github.com/gorilla/websocket"
"go.uber.org/zap"
)
var noDeadline = time.Time{}
func zapPtr(conn *ConnWebSocket) zap.Field {
return zap.Uintptr("conn", uintptr(unsafe.Pointer(conn)))
}
type ConnWebSocket struct {
netConn *websocket.Conn
pendingResponses *sync.Map
pendingSize int32
maxInProcess int32
createdAt time.Time
usedAt int64 // atomic
borrowed int32 // atomic
opt *Options
notifier pNotifier
releaseConn pReleaseConn
_broken bool
_closed uint32 // atomic
closeCn chan struct{}
pingErrorsNum int
lastIoError error
wLock sync.Mutex
}
func NewConnWebSocket(opt *Options) (*ConnWebSocket, error) {
dialer := websocket.Dialer{
WriteBufferSize: 1024 * 8,
ReadBufferSize: 1024 * 8,
HandshakeTimeout: 5 * time.Second,
}
netConn, _, err := dialer.Dial(opt.GdbUrl, http.Header{})
if err != nil {
return nil, err
}
// disable system tcp-keepAlive
if tcp, ok := netConn.UnderlyingConn().(*net.TCPConn); ok {
if err := tcp.SetKeepAlive(false); err != nil {
internal.Logger.Error("set keepAlive failed", zap.Error(err))
}
}
cn := &ConnWebSocket{
opt: opt,
netConn: netConn,
createdAt: time.Now(),
closeCn: make(chan struct{}),
pendingResponses: &sync.Map{},
lastIoError: errConnClosed,
maxInProcess: int32(opt.MaxInProcessPerConn),
}
cn.setUsedAt(time.Now())
// connect workers
if opt.PingInterval > 0 {
go cn.connCheck(opt.PingInterval)
}
go cn.readResponse()
internal.Logger.Info("create connect", zap.String("url", opt.GdbUrl),
zap.Int("concurrent", opt.MaxInProcessPerConn), zapPtr(cn), zap.Duration("pingInterval", opt.PingInterval))
return cn, nil
}
func (cn *ConnWebSocket) String() string {
return fmt.Sprintf("conn<%d>: createAt %s, usedAt %s, borrowed %d, pending %d,"+
" broken %t, closed %t, pingErrorNum %d",
uintptr(unsafe.Pointer(cn)), cn.createdAt.Format("2006-01-02_3:04:05.000"),
cn.UsedAt().Format("2006-01-02_3:04:05.000"),
atomic.LoadInt32(&cn.borrowed), atomic.LoadInt32(&cn.pendingSize),
cn._broken, cn.closed(), cn.pingErrorsNum)
}
func (cn *ConnWebSocket) Close() {
if !atomic.CompareAndSwapUint32(&cn._closed, 0, 1) {
return
}
// close chan quit to wakeup all goroutine
close(cn.closeCn)
// close connection
cn.netConn.Close()
// fill complete all pending response future
cn.pendingResponses.Range(func(key, value interface{}) bool {
response := graphsonv3.NewErrorResponse(key.(string),
graphsonv3.RESPONSE_STATUS_REQUEST_ERROR_DELIVER, cn.lastIoError)
value.(*graphsonv3.ResponseFuture).Complete(response)
return true
})
atomic.StoreInt32(&cn.pendingSize, 0)
cn.pendingResponses = &sync.Map{}
internal.Logger.Info("connect close", zapPtr(cn))
}
func (cn *ConnWebSocket) UsedAt() time.Time {
unix := atomic.LoadInt64(&cn.usedAt)
return time.Unix(unix, 0)
}
func (cn *ConnWebSocket) CreatedAt() time.Time {
return cn.createdAt
}
func (cn *ConnWebSocket) setUsedAt(tm time.Time) {
atomic.StoreInt64(&cn.usedAt, tm.Unix())
}
func (cn *ConnWebSocket) setNotifier(n pNotifier) {
cn.notifier = n
}
func (cn *ConnWebSocket) setReleaseConn(n pReleaseConn) {
cn.releaseConn = n
}
func (cn *ConnWebSocket) returnToPool() bool {
if cn.releaseConn != nil {
cn.releaseConn(cn)
}
return true
}
func (cn *ConnWebSocket) connCheck(frequency time.Duration) {
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// It is possible that ticker and closedCn arrive together,
// and select pseudo-randomly pick ticker case, we double
// check here to prevent being executed after closed.
if cn.closed() {
return
}
err := cn.doping(3)
if err != nil {
cn.pingErrorsNum += 1
internal.Logger.Error("status check", zapPtr(cn), zap.Time("time", time.Now()), zap.Error(err))
if cn.pingErrorsNum >= 3 {
cn._broken = true
cn.lastIoError = err
// wakeup pool to check connection status
_ = cn.notifier != nil && cn.notifier()
internal.Logger.Error("conn ping broken", zapPtr(cn), zap.Time("time", time.Now()))
return
}
} else {
cn.pingErrorsNum = 0
}
case <-cn.closeCn:
return
}
}
}
func (cn *ConnWebSocket) doping(retry int) error {
var err error
for i := 0; i < retry && !cn.brokenOrClosed(); i++ {
err = cn.netConn.WriteControl(websocket.PingMessage, []byte{}, cn.deadline(cn.opt.WriteTimeout))
if err == nil {
return nil
}
internal.Logger.Debug("ping failed", zapPtr(cn), zap.Time("time", time.Now()), zap.Error(err))
time.Sleep(time.Second)
}
return err
}
func (cn *ConnWebSocket) broken() bool {
return cn._broken
}
func (cn *ConnWebSocket) closed() bool {
return atomic.LoadUint32(&cn._closed) == 1
}
func (cn *ConnWebSocket) brokenOrClosed() bool {
return cn._broken || cn.closed()
}
func (cn *ConnWebSocket) availableInProcess() int32 {
return int32(math.Max(0, float64(cn.maxInProcess-atomic.LoadInt32(&cn.pendingSize))))
}
func (cn *ConnWebSocket) readResponse() {
var errorTimes = 0
for {
if cn.brokenOrClosed() {
internal.Logger.Info("conn read routine exit", zapPtr(cn), zap.Time("time", time.Now()))
return
}
var msg []byte
var err error
var response *graphsonv3.Response
// read response as block, exit by io close signal
if err = cn.netConn.SetReadDeadline(cn.deadline(0)); err == nil {
if _, msg, err = cn.netConn.ReadMessage(); err == nil {
response, err = graphsonv3.ReadResponse(msg)
}
}
// handle response and tick future
if response != nil {
cn.handleResponse(response)
}
// check errors
if err != nil {
errorTimes++
if errorTimes > 10 {
cn._broken = true
cn.lastIoError = err
_ = cn.notifier != nil && cn.notifier()
internal.Logger.Error("conn read broken", zapPtr(cn), zap.Time("time", time.Now()), zap.Error(err))
return
}
} else {
errorTimes = 0
}
}
}
func (cn *ConnWebSocket) handleResponse(response *graphsonv3.Response) {
if response.Code == graphsonv3.RESPONSE_STATUS_AUTHENTICATE {
request, _ := graphsonv3.MakeAuthRequest(response.RequestID, cn.opt.Username, cn.opt.Password)
// append auth request to server and do not care return future
cn.SubmitRequestAsync(request)
return
}
if future, ok := cn.pendingResponses.Load(response.RequestID); ok {
responseFuture := future.(*graphsonv3.ResponseFuture)
responseFuture.FixResponse(func(respChan *graphsonv3.Response) {
respChan.Code = response.Code
if respChan.Data == nil {
respChan.Data = response.Data
} else {
if newData, ok := response.Data.(json.RawMessage); ok {
// make Data as Slice when json.RawMessage append
if data, ok := respChan.Data.(json.RawMessage); ok {
dataList := make([]json.RawMessage, 2, 8)
dataList[0] = data
dataList[1] = newData
respChan.Data = dataList
} else if dataList, ok := respChan.Data.([]json.RawMessage); ok {
respChan.Data = append(dataList, newData)
} else {
// FIXME: incoming rawMessage but couldn't append to
internal.Logger.Error("incoming rawMessage after", zap.Time("time", time.Now()), zap.Stringer("data", reflect.TypeOf(respChan.Data)))
}
} else if newData, ok := response.Data.(error); ok {
// FIXME: incoming a error, ignore it if here is before, take it if not
if _, isErr := respChan.Data.(error); !isErr {
respChan.Data = newData
}
internal.Logger.Debug("incoming error after", zap.Time("time", time.Now()), zap.Stringer("data", reflect.TypeOf(respChan.Data)))
} else {
internal.Logger.Error("ignore incoming message", zap.Time("time", time.Now()), zap.Stringer("data", reflect.TypeOf(response.Data)))
}
}
})
if response.Code != graphsonv3.RESPONSE_STATUS_PARITAL_CONTENT {
// get a whole response, remove from pending queue then signal to
cn.pendingResponses.Delete(response.RequestID)
atomic.AddInt32(&cn.pendingSize, -1)
responseFuture.Complete(nil)
if (response.Code != graphsonv3.RESPONSE_STATUS_SUCCESS) && (response.Code != graphsonv3.RESPONSE_STATUS_NO_CONTENT) {
internal.Logger.Debug("response", zap.Time("time", time.Now()), zap.Int("code", response.Code),
zap.String("error", fmt.Sprint(response.Data)))
}
}
} else {
internal.Logger.Error("handle response not found", zap.Time("time", time.Now()), zap.String("id", response.RequestID))
}
}
func (cn *ConnWebSocket) deadline(timeout time.Duration) time.Time {
tm := time.Now()
cn.setUsedAt(tm)
if timeout > 0 {
return tm.Add(timeout)
}
return noDeadline
}
func (cn *ConnWebSocket) SubmitRequestAsync(request *graphsonv3.Request) (*graphsonv3.ResponseFuture, error) {
if cn.brokenOrClosed() {
internal.Logger.Error("request send close", zapPtr(cn), zap.Time("time", time.Now()), zap.Error(errConnClosed))
return nil, errConnClosed
}
if atomic.LoadInt32(&cn.pendingSize) >= cn.maxInProcess {
internal.Logger.Error("conn", zap.Stringer("cn", cn))
internal.Logger.Error("request send over", zapPtr(cn), zap.Time("time", time.Now()), zap.Error(errOverQueue))
return nil, errOverQueue
}
future := graphsonv3.NewResponseFuture(request, cn.returnToPool)
// serializer request
outBuf, err := graphsonv3.SerializerRequest(request)
if err != nil {
response := graphsonv3.NewErrorResponse(request.RequestID,
graphsonv3.RESPONSE_STATUS_REQUEST_ERROR_SERIALIZATION, err)
future.Complete(response)
internal.Logger.Error("request send serializer", zapPtr(cn), zap.Time("time", time.Now()), zap.Error(err))
return future, nil
}
// check pending or not
if _, ok := cn.pendingResponses.LoadOrStore(request.RequestID, future); ok {
// rewrite the same 'requestId' with pending requests
if request.Op != graph.OPS_AUTHENTICATION {
response := graphsonv3.NewErrorResponse(request.RequestID,
graphsonv3.RESPONSE_STATUS_REQUEST_ERROR_DELIVER,
errDuplicateId)
internal.Logger.Error("request duplicate", zap.Time("time", time.Now()), zap.String("id ", request.RequestID))
future.Complete(response)
return future, nil
}
} else {
atomic.AddInt32(&cn.pendingSize, 1)
}
// send request to server
cn.wLock.Lock()
if err = cn.netConn.SetWriteDeadline(cn.deadline(cn.opt.WriteTimeout)); err == nil {
err = cn.netConn.WriteMessage(websocket.BinaryMessage, outBuf)
}
cn.wLock.Unlock()
// check network write status and write back notifier to writer
if err != nil {
response := graphsonv3.NewErrorResponse(request.RequestID,
graphsonv3.RESPONSE_STATUS_REQUEST_ERROR_DELIVER, err)
cn.pendingResponses.Delete(request.RequestID)
atomic.AddInt32(&cn.pendingSize, -1)
future.Complete(response)
internal.Logger.Error("request send io", zapPtr(cn), zap.Time("time", time.Now()), zap.Error(err))
}
return future, nil
}