client/tablesessionpool.go (69 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package client import ( "github.com/apache/iotdb-client-go/common" "log" "sync/atomic" ) // TableSessionPool manages a pool of ITableSession instances, enabling efficient // reuse and management of resources. It provides methods to acquire a session // from the pool and to close the pool, releasing all held resources. // // This implementation ensures proper lifecycle management of sessions, // including efficient reuse and cleanup of resources. type TableSessionPool struct { sessionPool SessionPool } // NewTableSessionPool creates a new TableSessionPool with the specified configuration. // // Parameters: // - conf: PoolConfig defining the configuration for the pool. // - maxSize: The maximum number of sessions the pool can hold. // - connectionTimeoutInMs: Timeout for establishing a connection in milliseconds. // - waitToGetSessionTimeoutInMs: Timeout for waiting to acquire a session in milliseconds. // - enableCompression: A boolean indicating whether to enable compression. // // Returns: // - A TableSessionPool instance. func NewTableSessionPool(conf *PoolConfig, maxSize, connectionTimeoutInMs, waitToGetSessionTimeoutInMs int, enableCompression bool) TableSessionPool { return TableSessionPool{sessionPool: newSessionPoolWithSqlDialect(conf, maxSize, connectionTimeoutInMs, waitToGetSessionTimeoutInMs, enableCompression, TableSqlDialect)} } // GetSession acquires an ITableSession instance from the pool. // // Returns: // - A usable ITableSession instance for interacting with IoTDB. // - An error if a session cannot be acquired. func (spool *TableSessionPool) GetSession() (ITableSession, error) { return spool.sessionPool.getTableSession() } // Close closes the TableSessionPool, releasing all held resources. // Once closed, no further sessions can be acquired from the pool. func (spool *TableSessionPool) Close() { spool.sessionPool.Close() } // PooledTableSession represents a session managed within a TableSessionPool. // It ensures proper cleanup and reusability of the session. type PooledTableSession struct { session Session sessionPool *SessionPool closed int32 } // Insert inserts a Tablet into the database. // // Parameters: // - tablet: A pointer to a Tablet containing time-series data to be inserted. // // Returns: // - r: A pointer to TSStatus indicating the execution result. // - err: An error if an issue occurs during the operation. func (s *PooledTableSession) Insert(tablet *Tablet) (r *common.TSStatus, err error) { r, err = s.session.insertRelationalTablet(tablet) if err == nil { return } s.sessionPool.dropSession(s.session) atomic.StoreInt32(&s.closed, 1) s.session = Session{} return } // ExecuteNonQueryStatement executes a non-query SQL statement, such as a DDL or DML command. // // Parameters: // - sql: The SQL statement to execute. // // Returns: // - r: A pointer to TSStatus indicating the execution result. // - err: An error if an issue occurs during the operation. func (s *PooledTableSession) ExecuteNonQueryStatement(sql string) (r *common.TSStatus, err error) { r, err = s.session.ExecuteNonQueryStatement(sql) if err == nil { return } s.sessionPool.dropSession(s.session) atomic.StoreInt32(&s.closed, 1) s.session = Session{} return } // ExecuteQueryStatement executes a query SQL statement and returns the result set. // // Parameters: // - sql: The SQL query statement to execute. // - timeoutInMs: A pointer to the timeout duration in milliseconds for query execution. // // Returns: // - result: A pointer to SessionDataSet containing the query results. // - err: An error if an issue occurs during the operation. func (s *PooledTableSession) ExecuteQueryStatement(sql string, timeoutInMs *int64) (*SessionDataSet, error) { sessionDataSet, err := s.session.ExecuteQueryStatement(sql, timeoutInMs) if err == nil { return sessionDataSet, nil } s.sessionPool.dropSession(s.session) atomic.StoreInt32(&s.closed, 1) s.session = Session{} return nil, err } // Close closes the PooledTableSession, releasing it back to the pool. // // Returns: // - err: An error if there is an issue with session closure or cleanup. func (s *PooledTableSession) Close() error { if atomic.CompareAndSwapInt32(&s.closed, 0, 1) { if s.session.config.Database != s.sessionPool.config.Database && s.sessionPool.config.Database != "" { r, err := s.session.ExecuteNonQueryStatement("use " + s.sessionPool.config.Database) if r.Code == ExecuteStatementError || err != nil { log.Println("Failed to change back database by executing: use ", s.sessionPool.config.Database) s.session.Close() return nil } } } s.sessionPool.PutBack(s.session) s.session = Session{} return nil }