example/session_pool/table/table_session_pool_example.go (148 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package main
import (
"github.com/apache/iotdb-client-go/client"
"github.com/apache/iotdb-client-go/common"
"log"
"strconv"
"sync"
"sync/atomic"
"time"
)
func main() {
sessionPoolWithSpecificDatabaseExample()
sessionPoolWithoutSpecificDatabaseExample()
putBackToSessionPoolExample()
}
func putBackToSessionPoolExample() {
// should create database test_db before executing
config := &client.PoolConfig{
Host: "127.0.0.1",
Port: "6667",
UserName: "root",
Password: "root",
Database: "test_db",
}
sessionPool := client.NewTableSessionPool(config, 3, 60000, 4000, false)
defer sessionPool.Close()
num := 4
successGetSessionNum := int32(0)
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
dbName := "db" + strconv.Itoa(i)
go func() {
defer wg.Done()
session, err := sessionPool.GetSession()
if err != nil {
log.Println("Failed to create database "+dbName+"because ", err)
return
}
atomic.AddInt32(&successGetSessionNum, 1)
defer func() {
time.Sleep(6 * time.Second)
// put back to session pool
session.Close()
}()
checkError(session.ExecuteNonQueryStatement("create database " + dbName))
checkError(session.ExecuteNonQueryStatement("use " + dbName))
checkError(session.ExecuteNonQueryStatement("create table table_of_" + dbName + " (tag1 string tag, tag2 string tag, s1 text field, s2 text field)"))
}()
}
wg.Wait()
log.Println("success num is", successGetSessionNum)
log.Println("All session's database have been reset.")
// the using database will automatically reset to session pool's database after the session closed
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
session, err := sessionPool.GetSession()
if err != nil {
log.Println("Failed to get session because ", err)
}
defer session.Close()
timeout := int64(3000)
dataSet, err := session.ExecuteQueryStatement("show tables", &timeout)
for {
hasNext, err := dataSet.Next()
if err != nil {
log.Fatal(err)
}
if !hasNext {
break
}
value, err := dataSet.GetString("TableName")
if err != nil {
log.Fatal(err)
}
log.Println("table is", value)
}
dataSet.Close()
}()
}
wg.Wait()
}
func sessionPoolWithSpecificDatabaseExample() {
// should create database test_db before executing
config := &client.PoolConfig{
Host: "127.0.0.1",
Port: "6667",
UserName: "root",
Password: "root",
Database: "test_db",
}
sessionPool := client.NewTableSessionPool(config, 3, 60000, 8000, false)
defer sessionPool.Close()
num := 10
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
tableName := "t" + strconv.Itoa(i)
go func() {
defer wg.Done()
session, err := sessionPool.GetSession()
if err != nil {
log.Println("Failed to create table "+tableName+"because ", err)
return
}
defer session.Close()
checkError(session.ExecuteNonQueryStatement("create table " + tableName + " (tag1 string tag, tag2 string tag, s1 text field, s2 text field)"))
}()
}
wg.Wait()
}
func sessionPoolWithoutSpecificDatabaseExample() {
config := &client.PoolConfig{
Host: "127.0.0.1",
Port: "6667",
UserName: "root",
Password: "root",
}
sessionPool := client.NewTableSessionPool(config, 3, 60000, 8000, false)
defer sessionPool.Close()
num := 10
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
dbName := "db" + strconv.Itoa(i)
go func() {
defer wg.Done()
session, err := sessionPool.GetSession()
if err != nil {
log.Println("Failed to create database ", dbName, err)
return
}
defer session.Close()
checkError(session.ExecuteNonQueryStatement("create database " + dbName))
checkError(session.ExecuteNonQueryStatement("use " + dbName))
checkError(session.ExecuteNonQueryStatement("create table t1 (tag1 string tag, tag2 string tag, s1 text field, s2 text field)"))
}()
}
wg.Wait()
}
func checkError(status *common.TSStatus, err error) {
if err != nil {
log.Fatal(err)
}
if status != nil {
if err = client.VerifySuccess(status); err != nil {
log.Println(err)
}
}
}