client/sessionpool.go (160 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package client import ( "errors" "log" "runtime" "time" ) var errTimeout = errors.New("get session timeout") var errPoolClosed = errors.New("sessionPool has closed") var defaultMultiple = 5 type SessionPool struct { config *PoolConfig maxSize int waitToGetSessionTimeoutInMs int enableCompression bool connectionTimeoutInMs int ch chan Session sem chan int8 } type PoolConfig struct { Host string Port string NodeUrls []string UserName string Password string FetchSize int32 TimeZone string ConnectRetryMax int Database string sqlDialect string } func NewSessionPool(conf *PoolConfig, maxSize, connectionTimeoutInMs, waitToGetSessionTimeoutInMs int, enableCompression bool) SessionPool { return newSessionPoolWithSqlDialect(conf, maxSize, connectionTimeoutInMs, waitToGetSessionTimeoutInMs, enableCompression, TreeSqlDialect) } func newSessionPoolWithSqlDialect(conf *PoolConfig, maxSize, connectionTimeoutInMs, waitToGetSessionTimeoutInMs int, enableCompression bool, sqlDialect string) SessionPool { conf.sqlDialect = sqlDialect if maxSize <= 0 { maxSize = runtime.NumCPU() * defaultMultiple } var ch = make(chan Session, maxSize) var sem = make(chan int8, maxSize) return SessionPool{ config: conf, maxSize: maxSize, waitToGetSessionTimeoutInMs: waitToGetSessionTimeoutInMs, connectionTimeoutInMs: connectionTimeoutInMs, enableCompression: enableCompression, ch: ch, sem: sem, } } func (spool *SessionPool) GetSession() (session Session, err error) { for { select { case spool.sem <- 1: select { case session, ok := <-spool.ch: if ok { return session, nil } else { log.Println("sessionPool has closed") return session, errPoolClosed } default: config := spool.config session, err := spool.ConstructSession(config) return session, err } case <-time.After(time.Millisecond * time.Duration(spool.waitToGetSessionTimeoutInMs)): log.Println("get session timeout") return session, errTimeout } } } func (spool *SessionPool) getTableSession() (ITableSession, error) { tableSession := PooledTableSession{} session, err := spool.GetSession() if err != nil { return nil, err } tableSession.session = session tableSession.sessionPool = spool return &tableSession, nil } func (spool *SessionPool) ConstructSession(config *PoolConfig) (session Session, err error) { if len(config.NodeUrls) > 0 { session, err = newClusterSessionWithSqlDialect(getClusterSessionConfig(config)) if err != nil { return session, err } if err = session.OpenCluster(spool.enableCompression); err != nil { log.Print(err) return session, err } } else { session = newSessionWithSpecifiedSqlDialect(getSessionConfig(config)) if err := session.Open(spool.enableCompression, spool.connectionTimeoutInMs); err != nil { log.Print(err) return session, err } } return session, nil } func getSessionConfig(config *PoolConfig) *Config { return &Config{ Host: config.Host, Port: config.Port, UserName: config.UserName, Password: config.Password, FetchSize: config.FetchSize, TimeZone: config.TimeZone, ConnectRetryMax: config.ConnectRetryMax, sqlDialect: config.sqlDialect, Database: config.Database, } } func getClusterSessionConfig(config *PoolConfig) *ClusterConfig { return &ClusterConfig{ NodeUrls: config.NodeUrls, UserName: config.UserName, Password: config.Password, FetchSize: config.FetchSize, TimeZone: config.TimeZone, ConnectRetryMax: config.ConnectRetryMax, sqlDialect: config.sqlDialect, Database: config.Database, } } func (spool *SessionPool) PutBack(session Session) { defer func() { if r := recover(); r != nil { session.Close() } }() if session.trans.IsOpen() { spool.ch <- session } <-spool.sem } func (spool *SessionPool) dropSession(session Session) { defer func() { if e := recover(); e != nil { session.Close() } }() err := session.Close() if err != nil { log.Println("Failed to close session ", session) } <-spool.sem } func (spool *SessionPool) Close() { close(spool.ch) for s := range spool.ch { s.Close() } close(spool.sem) }