client/sessionpool.go (119 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package client import ( "errors" "log" "runtime" "time" ) var errTimeout = errors.New("get session timeout") var errPoolClosed = errors.New("sessionPool has closed") var defaultMultiple = 5 type SessionPool struct { config *PoolConfig maxSize int waitToGetSessionTimeoutInMs int enableCompression bool connectionTimeoutInMs int ch chan Session sem chan int8 } type PoolConfig struct { Host string Port string NodeUrls []string UserName string Password string FetchSize int32 TimeZone string ConnectRetryMax int } func NewSessionPool(conf *PoolConfig, maxSize, connectionTimeoutInMs, waitToGetSessionTimeoutInMs int, enableCompression bool) SessionPool { if maxSize <= 0 { maxSize = runtime.NumCPU() * defaultMultiple } var ch = make(chan Session, maxSize) var sem = make(chan int8, maxSize) return SessionPool{ config: conf, maxSize: maxSize, waitToGetSessionTimeoutInMs: waitToGetSessionTimeoutInMs, connectionTimeoutInMs: connectionTimeoutInMs, enableCompression: enableCompression, ch: ch, sem: sem, } } func (spool *SessionPool) GetSession() (session Session, err error) { for { select { case spool.sem <- 1: select { case session, ok := <-spool.ch: if ok { return session, nil } else { log.Println("sessionPool has closed") return session, errPoolClosed } default: config := spool.config session, err := spool.ConstructSession(config) return session, err } case <-time.After(time.Millisecond * time.Duration(spool.waitToGetSessionTimeoutInMs)): log.Println("get session timeout") return session, errTimeout } } } func (spool *SessionPool) ConstructSession(config *PoolConfig) (session Session, err error) { if len(config.NodeUrls) > 0 { session = NewClusterSession(getClusterSessionConfig(config)) if err := session.OpenCluster(spool.enableCompression); err != nil { log.Print(err) return session, err } } else { session = NewSession(getSessionConfig(config)) if err := session.Open(spool.enableCompression, spool.connectionTimeoutInMs); err != nil { log.Print(err) return session, err } } return session, nil } func getSessionConfig(config *PoolConfig) *Config { return &Config{ Host: config.Host, Port: config.Port, UserName: config.UserName, Password: config.Password, FetchSize: config.FetchSize, TimeZone: config.TimeZone, ConnectRetryMax: config.ConnectRetryMax, } } func getClusterSessionConfig(config *PoolConfig) *ClusterConfig { return &ClusterConfig{ NodeUrls: config.NodeUrls, UserName: config.UserName, Password: config.Password, FetchSize: config.FetchSize, TimeZone: config.TimeZone, ConnectRetryMax: config.ConnectRetryMax, } } func (spool *SessionPool) PutBack(session Session) { if session.trans.IsOpen() { spool.ch <- session } <-spool.sem } func (spool *SessionPool) Close() { close(spool.ch) for s := range spool.ch { s.Close() } close(spool.sem) }