client/session.go (1,073 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package client
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"log"
"net"
"reflect"
"sort"
"strings"
"time"
"github.com/apache/iotdb-client-go/common"
"github.com/apache/iotdb-client-go/rpc"
"github.com/apache/thrift/lib/go/thrift"
)
const (
DefaultTimeZone = "Asia/Shanghai"
DefaultFetchSize = 1024
DefaultConnectRetryMax = 3
TreeSqlDialect = "tree"
TableSqlDialect = "table"
)
type Version string
const (
V_0_12 = Version("V_0_12")
V_0_13 = Version("V_0_13")
V_1_0 = Version("V_1_0")
DEFAULT_VERSION = V_1_0
)
var errLength = errors.New("deviceIds, times, measurementsList and valuesList's size should be equal")
type Config struct {
Host string
Port string
UserName string
Password string
FetchSize int32
TimeZone string
ConnectRetryMax int
sqlDialect string
Version Version
Database string
}
type Session struct {
config *Config
client *rpc.IClientRPCServiceClient
sessionId int64
trans thrift.TTransport
requestStatementId int64
protocolFactory thrift.TProtocolFactory
endPointList []endPoint
timeFactor int32
}
type endPoint struct {
Host string
Port string
}
func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) error {
if s.config.FetchSize <= 0 {
s.config.FetchSize = DefaultFetchSize
}
if s.config.TimeZone == "" {
s.config.TimeZone = DefaultTimeZone
}
if s.config.ConnectRetryMax <= 0 {
s.config.ConnectRetryMax = DefaultConnectRetryMax
}
var err error
// in thrift 0.14.1, this func returns two values; in thrift 0.15.0, it returns one.
s.trans = thrift.NewTSocketConf(net.JoinHostPort(s.config.Host, s.config.Port), &thrift.TConfiguration{
ConnectTimeout: time.Duration(connectionTimeoutInMs) * time.Millisecond, // Use 0 for no timeout
})
// s.trans = thrift.NewTFramedTransport(s.trans) // deprecated
tmp_conf := thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE}
s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf)
if !s.trans.IsOpen() {
err = s.trans.Open()
if err != nil {
return err
}
}
s.protocolFactory = getProtocolFactory(enableRPCCompression)
iprot := s.protocolFactory.GetProtocol(s.trans)
oprot := s.protocolFactory.GetProtocol(s.trans)
s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot))
req := rpc.TSOpenSessionReq{
ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName,
Password: &s.config.Password,
}
req.Configuration = make(map[string]string)
req.Configuration["sql_dialect"] = s.config.sqlDialect
if s.config.Version == "" {
req.Configuration["version"] = string(DEFAULT_VERSION)
} else {
req.Configuration["version"] = string(s.config.Version)
}
if s.config.Database != "" {
req.Configuration["db"] = s.config.Database
}
resp, err := s.client.OpenSession(context.Background(), &req)
if err != nil {
return err
}
s.sessionId = resp.GetSessionId()
s.requestStatementId, err = s.client.RequestStatementId(context.Background(), s.sessionId)
if timeFactor, err := getTimeFactor(resp); err != nil {
return err
} else {
s.timeFactor = timeFactor
}
return err
}
type ClusterConfig struct {
NodeUrls []string //ip:port
UserName string
Password string
FetchSize int32
TimeZone string
ConnectRetryMax int
sqlDialect string
Database string
}
func (s *Session) OpenCluster(enableRPCCompression bool) error {
if s.config.FetchSize <= 0 {
s.config.FetchSize = DefaultFetchSize
}
if s.config.TimeZone == "" {
s.config.TimeZone = DefaultTimeZone
}
if s.config.ConnectRetryMax <= 0 {
s.config.ConnectRetryMax = DefaultConnectRetryMax
}
var err error
s.protocolFactory = getProtocolFactory(enableRPCCompression)
iprot := s.protocolFactory.GetProtocol(s.trans)
oprot := s.protocolFactory.GetProtocol(s.trans)
s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot))
req := rpc.TSOpenSessionReq{
ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName,
Password: &s.config.Password,
}
req.Configuration = make(map[string]string)
req.Configuration["sql_dialect"] = s.config.sqlDialect
if s.config.Version == "" {
req.Configuration["version"] = string(DEFAULT_VERSION)
} else {
req.Configuration["version"] = string(s.config.Version)
}
if s.config.Database != "" {
req.Configuration["db"] = s.config.Database
}
resp, err := s.client.OpenSession(context.Background(), &req)
if err != nil {
return err
}
if timeFactor, err := getTimeFactor(resp); err != nil {
return err
} else {
s.timeFactor = timeFactor
}
s.sessionId = resp.GetSessionId()
s.requestStatementId, err = s.client.RequestStatementId(context.Background(), s.sessionId)
return err
}
func getProtocolFactory(enableRPCCompression bool) thrift.TProtocolFactory {
if enableRPCCompression {
return thrift.NewTCompactProtocolFactoryConf(&thrift.TConfiguration{})
} else {
return thrift.NewTBinaryProtocolFactoryConf(&thrift.TConfiguration{})
}
}
func (s *Session) Close() error {
req := rpc.NewTSCloseSessionReq()
req.SessionId = s.sessionId
_, err := s.client.CloseSession(context.Background(), req)
if err != nil {
return err
}
return s.trans.Close()
}
/*
*set one storage group
*param
*storageGroupId: string, storage group name (starts from root)
*return
*error: correctness of operation
*/
func (s *Session) SetStorageGroup(storageGroupId string) (r *common.TSStatus, err error) {
r, err = s.client.SetStorageGroup(context.Background(), s.sessionId, storageGroupId)
if err != nil && r == nil {
if s.reconnect() {
r, err = s.client.SetStorageGroup(context.Background(), s.sessionId, storageGroupId)
}
}
return r, err
}
/*
*delete one storage group
*param
*storageGroupId: string, storage group name (starts from root)
*return
*error: correctness of operation
*/
func (s *Session) DeleteStorageGroup(storageGroupId string) (r *common.TSStatus, err error) {
r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, []string{storageGroupId})
if err != nil && r == nil {
if s.reconnect() {
r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, []string{storageGroupId})
}
}
return r, err
}
/*
*delete multiple storage group
*param
*storageGroupIds: []string, paths of the target storage groups
*return
*error: correctness of operation
*/
func (s *Session) DeleteStorageGroups(storageGroupIds ...string) (r *common.TSStatus, err error) {
r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds)
if err != nil && r == nil {
if s.reconnect() {
r, err = s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds)
}
}
return r, err
}
/*
*create single time series
*params
*path: string, complete time series path (starts from root)
*dataType: int32, data type for this time series
*encoding: int32, data type for this time series
*compressor: int32, compressing type for this time series
*return
*error: correctness of operation
*/
func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) (r *common.TSStatus, err error) {
request := rpc.TSCreateTimeseriesReq{
SessionId: s.sessionId, Path: path, DataType: int32(dataType), Encoding: int32(encoding),
Compressor: int32(compressor), Attributes: attributes, Tags: tags,
}
status, err := s.client.CreateTimeseries(context.Background(), &request)
if err != nil && status == nil {
if s.reconnect() {
request.SessionId = s.sessionId
status, err = s.client.CreateTimeseries(context.Background(), &request)
}
}
return status, err
}
/*
*create single aligned time series
*params
*prefixPath: string, time series prefix path (starts from root)
*measurements: []string, sensor names
*dataTypes: []int32, data types for time series
*encodings: []int32, encodings for time series
*compressors: []int32, compressing types for time series
*measurementAlias: []string, sensor names alias
*return
*error: correctness of operation
*/
func (s *Session) CreateAlignedTimeseries(prefixPath string, measurements []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType, measurementAlias []string) (r *common.TSStatus, err error) {
destTypes := make([]int32, len(dataTypes))
for i, t := range dataTypes {
destTypes[i] = int32(t)
}
destEncodings := make([]int32, len(encodings))
for i, e := range encodings {
destEncodings[i] = int32(e)
}
destCompressions := make([]int32, len(compressors))
for i, e := range compressors {
destCompressions[i] = int32(e)
}
request := rpc.TSCreateAlignedTimeseriesReq{
SessionId: s.sessionId,
PrefixPath: prefixPath,
Measurements: measurements,
DataTypes: destTypes,
Encodings: destEncodings,
Compressors: destCompressions,
MeasurementAlias: measurementAlias,
}
status, err := s.client.CreateAlignedTimeseries(context.Background(), &request)
if err != nil && status == nil {
if s.reconnect() {
request.SessionId = s.sessionId
status, err = s.client.CreateAlignedTimeseries(context.Background(), &request)
}
}
return status, err
}
/*
*create multiple time series
*params
*paths: []string, complete time series paths (starts from root)
*dataTypes: []int32, data types for time series
*encodings: []int32, encodings for time series
*compressors: []int32, compressing types for time series
*return
*error: correctness of operation
*/
func (s *Session) CreateMultiTimeseries(paths []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType) (r *common.TSStatus, err error) {
destTypes := make([]int32, len(dataTypes))
for i, t := range dataTypes {
destTypes[i] = int32(t)
}
destEncodings := make([]int32, len(encodings))
for i, e := range encodings {
destEncodings[i] = int32(e)
}
destCompressions := make([]int32, len(compressors))
for i, e := range compressors {
destCompressions[i] = int32(e)
}
request := rpc.TSCreateMultiTimeseriesReq{
SessionId: s.sessionId, Paths: paths, DataTypes: destTypes,
Encodings: destEncodings, Compressors: destCompressions,
}
r, err = s.client.CreateMultiTimeseries(context.Background(), &request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.CreateMultiTimeseries(context.Background(), &request)
}
}
return r, err
}
/*
*delete multiple time series, including data and schema
*params
*paths: []string, time series paths, which should be complete (starts from root)
*return
*error: correctness of operation
*/
func (s *Session) DeleteTimeseries(paths []string) (r *common.TSStatus, err error) {
r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId, paths)
if err != nil && r == nil {
if s.reconnect() {
r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId, paths)
}
}
return r, err
}
/*
*delete all startTime <= data <= endTime in multiple time series
*params
*paths: []string, time series array that the data in
*startTime: int64, start time of deletion range
*endTime: int64, end time of deletion range
*return
*error: correctness of operation
*/
func (s *Session) DeleteData(paths []string, startTime int64, endTime int64) (r *common.TSStatus, err error) {
request := rpc.TSDeleteDataReq{SessionId: s.sessionId, Paths: paths, StartTime: startTime, EndTime: endTime}
r, err = s.client.DeleteData(context.Background(), &request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.DeleteData(context.Background(), &request)
}
}
return r, err
}
/*
*special case for inserting one row of String (TEXT) value
*params
*deviceId: string, time series path for device
*measurements: []string, sensor names
*values: []string, values to be inserted, for each sensor
*timestamp: int64, indicate the timestamp of the row of data
*return
*error: correctness of operation
*/
func (s *Session) InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) (r *common.TSStatus, err error) {
request := rpc.TSInsertStringRecordReq{
SessionId: s.sessionId, PrefixPath: deviceId, Measurements: measurements,
Values: values, Timestamp: timestamp,
}
r, err = s.client.InsertStringRecord(context.Background(), &request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.InsertStringRecord(context.Background(), &request)
}
}
return r, err
}
func (s *Session) GetTimeZone() (string, error) {
resp, err := s.client.GetTimeZone(context.Background(), s.sessionId)
if err != nil {
return DefaultTimeZone, err
}
return resp.TimeZone, nil
}
func (s *Session) SetTimeZone(timeZone string) (r *common.TSStatus, err error) {
request := rpc.TSSetTimeZoneReq{SessionId: s.sessionId, TimeZone: timeZone}
r, err = s.client.SetTimeZone(context.Background(), &request)
s.config.TimeZone = timeZone
return r, err
}
func (s *Session) ExecuteStatementWithContext(ctx context.Context, sql string) (*SessionDataSet, error) {
request := rpc.TSExecuteStatementReq{
SessionId: s.sessionId,
Statement: sql,
StatementId: s.requestStatementId,
FetchSize: &s.config.FetchSize,
}
resp, err := s.client.ExecuteStatementV2(ctx, &request)
if err != nil && resp == nil {
if s.reconnect() {
request.SessionId = s.sessionId
request.StatementId = s.requestStatementId
resp, err = s.client.ExecuteStatementV2(ctx, &request)
}
}
if statusErr := VerifySuccess(resp.Status); statusErr != nil {
return nil, statusErr
}
return s.genDataSet(sql, resp)
}
func (s *Session) ExecuteStatement(sql string) (*SessionDataSet, error) {
return s.ExecuteStatementWithContext(context.Background(), sql)
}
func (s *Session) ExecuteNonQueryStatement(sql string) (r *common.TSStatus, err error) {
request := rpc.TSExecuteStatementReq{
SessionId: s.sessionId,
Statement: sql,
StatementId: s.requestStatementId,
FetchSize: &s.config.FetchSize,
}
resp, err := s.client.ExecuteStatementV2(context.Background(), &request)
if err != nil && resp == nil {
if s.reconnect() {
request.SessionId = s.sessionId
request.StatementId = s.requestStatementId
resp, err = s.client.ExecuteStatementV2(context.Background(), &request)
}
}
if resp.IsSetDatabase() {
s.changeDatabase(*resp.Database)
}
return resp.Status, err
}
func (s *Session) changeDatabase(database string) {
s.config.Database = database
}
func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64) (*SessionDataSet, error) {
request := rpc.TSExecuteStatementReq{
SessionId: s.sessionId, Statement: sql, StatementId: s.requestStatementId,
FetchSize: &s.config.FetchSize, Timeout: timeoutMs,
}
if resp, err := s.client.ExecuteQueryStatementV2(context.Background(), &request); err == nil {
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.ColumnIndex2TsBlockColumnIndexList)
} else {
return nil, statusErr
}
} else {
if s.reconnect() {
request.SessionId = s.sessionId
request.StatementId = s.requestStatementId
resp, err = s.client.ExecuteQueryStatementV2(context.Background(), &request)
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList())
} else {
return nil, statusErr
}
}
return nil, err
}
}
func (s *Session) ExecuteAggregationQuery(paths []string, aggregations []common.TAggregationType,
startTime *int64, endTime *int64, interval *int64,
timeoutMs *int64,
) (*SessionDataSet, error) {
request := rpc.TSAggregationQueryReq{
SessionId: s.sessionId, StatementId: s.requestStatementId, Paths: paths,
Aggregations: aggregations, StartTime: startTime, EndTime: endTime, Interval: interval, FetchSize: &s.config.FetchSize, Timeout: timeoutMs,
}
if resp, err := s.client.ExecuteAggregationQueryV2(context.Background(), &request); err == nil {
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList())
} else {
return nil, statusErr
}
} else {
if s.reconnect() {
request.SessionId = s.sessionId
resp, err = s.client.ExecuteAggregationQueryV2(context.Background(), &request)
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList())
} else {
return nil, statusErr
}
}
return nil, err
}
}
func (s *Session) ExecuteAggregationQueryWithLegalNodes(paths []string, aggregations []common.TAggregationType,
startTime *int64, endTime *int64, interval *int64,
timeoutMs *int64, legalNodes *bool,
) (*SessionDataSet, error) {
request := rpc.TSAggregationQueryReq{
SessionId: s.sessionId, StatementId: s.requestStatementId, Paths: paths,
Aggregations: aggregations, StartTime: startTime, EndTime: endTime, Interval: interval, FetchSize: &s.config.FetchSize,
Timeout: timeoutMs, LegalPathNodes: legalNodes,
}
if resp, err := s.client.ExecuteAggregationQueryV2(context.Background(), &request); err == nil {
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList())
} else {
return nil, statusErr
}
} else {
if s.reconnect() {
request.SessionId = s.sessionId
resp, err = s.client.ExecuteAggregationQueryV2(context.Background(), &request)
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList())
} else {
return nil, statusErr
}
}
return nil, err
}
}
func (s *Session) genTSInsertRecordReq(deviceId string, time int64,
measurements []string,
types []TSDataType,
values []interface{},
isAligned bool,
) (*rpc.TSInsertRecordReq, error) {
request := &rpc.TSInsertRecordReq{}
request.SessionId = s.sessionId
request.PrefixPath = deviceId
request.Timestamp = time
request.Measurements = measurements
request.IsAligned = &isAligned
if bys, err := valuesToBytes(types, values); err == nil {
request.Values = bys
} else {
return nil, err
}
return request, nil
}
func (s *Session) InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r *common.TSStatus, err error) {
request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values, false)
if err != nil {
return nil, err
}
r, err = s.client.InsertRecord(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.InsertRecord(context.Background(), request)
}
}
return r, err
}
func (s *Session) InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r *common.TSStatus, err error) {
request, err := s.genTSInsertRecordReq(deviceId, timestamp, measurements, dataTypes, values, true)
if err != nil {
return nil, err
}
r, err = s.client.InsertRecord(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.InsertRecord(context.Background(), request)
}
}
return r, err
}
type deviceData struct {
timestamps []int64
measurementsSlice [][]string
dataTypesSlice [][]TSDataType
valuesSlice [][]interface{}
isAligned bool
}
func (d *deviceData) Len() int {
return len(d.timestamps)
}
func (d *deviceData) Less(i, j int) bool {
return d.timestamps[i] < d.timestamps[j]
}
func (d *deviceData) Swap(i, j int) {
d.timestamps[i], d.timestamps[j] = d.timestamps[j], d.timestamps[i]
d.measurementsSlice[i], d.measurementsSlice[j] = d.measurementsSlice[j], d.measurementsSlice[i]
d.dataTypesSlice[i], d.dataTypesSlice[j] = d.dataTypesSlice[j], d.dataTypesSlice[i]
d.valuesSlice[i], d.valuesSlice[j] = d.valuesSlice[j], d.valuesSlice[i]
}
// InsertRecordsOfOneDevice Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
// executeBatch, we pack some insert request in batch and send them to server. If you want improve
// your performance, please see insertTablet method
// Each row is independent, which could have different insertTargetName, time, number of measurements
func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps []int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType, valuesSlice [][]interface{}, sorted bool) (r *common.TSStatus, err error) {
length := len(timestamps)
if len(measurementsSlice) != length || len(dataTypesSlice) != length || len(valuesSlice) != length {
return nil, errors.New("timestamps, measurementsSlice and valuesSlice's size should be equal")
}
if !sorted {
sort.Sort(&deviceData{
timestamps: timestamps,
measurementsSlice: measurementsSlice,
dataTypesSlice: dataTypesSlice,
valuesSlice: valuesSlice,
})
}
valuesList := make([][]byte, length)
for i := 0; i < length; i++ {
if valuesList[i], err = valuesToBytes(dataTypesSlice[i], valuesSlice[i]); err != nil {
return nil, err
}
}
request := &rpc.TSInsertRecordsOfOneDeviceReq{
SessionId: s.sessionId,
PrefixPath: deviceId,
Timestamps: timestamps,
MeasurementsList: measurementsSlice,
ValuesList: valuesList,
}
r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request)
}
}
return r, err
}
func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps []int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType, valuesSlice [][]interface{}, sorted bool) (r *common.TSStatus, err error) {
length := len(timestamps)
if len(measurementsSlice) != length || len(dataTypesSlice) != length || len(valuesSlice) != length {
return nil, errors.New("timestamps, measurementsSlice and valuesSlice's size should be equal")
}
if !sorted {
sort.Sort(&deviceData{
timestamps: timestamps,
measurementsSlice: measurementsSlice,
dataTypesSlice: dataTypesSlice,
valuesSlice: valuesSlice,
})
}
valuesList := make([][]byte, length)
for i := 0; i < length; i++ {
if valuesList[i], err = valuesToBytes(dataTypesSlice[i], valuesSlice[i]); err != nil {
return nil, err
}
}
isAligned := true
request := &rpc.TSInsertRecordsOfOneDeviceReq{
SessionId: s.sessionId,
PrefixPath: deviceId,
Timestamps: timestamps,
MeasurementsList: measurementsSlice,
ValuesList: valuesList,
IsAligned: &isAligned,
}
r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.InsertRecordsOfOneDevice(context.Background(), request)
}
}
return r, err
}
/*
*insert multiple rows of data, records are independent to each other, in other words, there's no relationship
*between those records
*
*params
*deviceIds: []string, time series paths for device
*measurements: [][]string, each element of outer list indicates measurements of a device
*dataTypes: [][]int32, each element of outer list indicates sensor data types of a device
*values: [][]interface{}, values to be inserted, for each device
*timestamps: []int64, timestamps for records
*
*/
func (s *Session) InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{},
timestamps []int64,
) (r *common.TSStatus, err error) {
request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps, false)
if err != nil {
return nil, err
} else {
r, err = s.client.InsertRecords(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.InsertRecords(context.Background(), request)
}
}
return r, err
}
}
func (s *Session) InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{},
timestamps []int64,
) (r *common.TSStatus, err error) {
request, err := s.genInsertRecordsReq(deviceIds, measurements, dataTypes, values, timestamps, true)
if err != nil {
return nil, err
} else {
r, err = s.client.InsertRecords(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.InsertRecords(context.Background(), request)
}
}
return r, err
}
}
/*
* InsertTablets insert multiple tablets, tablets are independent to each other
*params
*tablets: []*client.Tablet, list of tablets
*/
func (s *Session) InsertTablets(tablets []*Tablet, sorted bool) (r *common.TSStatus, err error) {
if !sorted {
for _, t := range tablets {
if err := t.Sort(); err != nil {
return nil, err
}
}
}
request, err := s.genInsertTabletsReq(tablets, false)
if err != nil {
return nil, err
}
r, err = s.client.InsertTablets(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.InsertTablets(context.Background(), request)
}
}
return r, err
}
func (s *Session) InsertAlignedTablets(tablets []*Tablet, sorted bool) (r *common.TSStatus, err error) {
if !sorted {
for _, t := range tablets {
if err := t.Sort(); err != nil {
return nil, err
}
}
}
request, err := s.genInsertTabletsReq(tablets, true)
if err != nil {
return nil, err
}
r, err = s.client.InsertTablets(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.InsertTablets(context.Background(), request)
}
}
return r, err
}
func (s *Session) ExecuteBatchStatement(inserts []string) (r *common.TSStatus, err error) {
request := rpc.TSExecuteBatchStatementReq{
SessionId: s.sessionId,
Statements: inserts,
}
r, err = s.client.ExecuteBatchStatement(context.Background(), &request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.ExecuteBatchStatement(context.Background(), &request)
}
}
return r, err
}
func (s *Session) ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error) {
request := rpc.TSRawDataQueryReq{
SessionId: s.sessionId,
Paths: paths,
FetchSize: &s.config.FetchSize,
StartTime: startTime,
EndTime: endTime,
StatementId: s.requestStatementId,
}
resp, err := s.client.ExecuteRawDataQueryV2(context.Background(), &request)
if err != nil && resp == nil {
if s.reconnect() {
request.SessionId = s.sessionId
request.StatementId = s.requestStatementId
resp, err = s.client.ExecuteRawDataQueryV2(context.Background(), &request)
}
}
return s.genDataSet("", resp)
}
func (s *Session) ExecuteUpdateStatement(sql string) (*SessionDataSet, error) {
request := rpc.TSExecuteStatementReq{
SessionId: s.sessionId,
Statement: sql,
StatementId: s.requestStatementId,
FetchSize: &s.config.FetchSize,
}
resp, err := s.client.ExecuteUpdateStatementV2(context.Background(), &request)
if err != nil && resp == nil {
if s.reconnect() {
request.SessionId = s.sessionId
request.StatementId = s.requestStatementId
resp, err = s.client.ExecuteUpdateStatementV2(context.Background(), &request)
}
}
return s.genDataSet(sql, resp)
}
func (s *Session) genDataSet(sql string, resp *rpc.TSExecuteStatementResp) (*SessionDataSet, error) {
var queryId int64
if resp.QueryId == nil {
queryId = 0
} else {
queryId = *resp.QueryId
}
moreData := false
if resp.MoreData != nil {
moreData = *resp.MoreData
}
return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap,
queryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, nil, moreData, s.config.FetchSize, s.config.TimeZone, s.timeFactor, resp.GetColumnIndex2TsBlockColumnIndexList())
}
func (s *Session) genInsertTabletsReq(tablets []*Tablet, isAligned bool) (*rpc.TSInsertTabletsReq, error) {
var (
length = len(tablets)
deviceIds = make([]string, length)
measurementsList = make([][]string, length)
valuesList = make([][]byte, length)
timestampsList = make([][]byte, length)
typesList = make([][]int32, length)
sizeList = make([]int32, length)
)
for index, tablet := range tablets {
deviceIds[index] = tablet.insertTargetName
measurementsList[index] = tablet.GetMeasurements()
values, err := tablet.getValuesBytes()
if err != nil {
return nil, err
}
valuesList[index] = values
timestampsList[index] = tablet.GetTimestampBytes()
typesList[index] = tablet.getDataTypes()
sizeList[index] = int32(tablet.RowSize)
}
request := rpc.TSInsertTabletsReq{
SessionId: s.sessionId,
PrefixPaths: deviceIds,
TypesList: typesList,
MeasurementsList: measurementsList,
ValuesList: valuesList,
TimestampsList: timestampsList,
SizeList: sizeList,
IsAligned: &isAligned,
}
return &request, nil
}
func (s *Session) genInsertRecordsReq(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{},
timestamps []int64, isAligned bool,
) (*rpc.TSInsertRecordsReq, error) {
length := len(deviceIds)
if length != len(timestamps) || length != len(measurements) || length != len(values) {
return nil, errLength
}
request := rpc.TSInsertRecordsReq{
SessionId: s.sessionId,
PrefixPaths: deviceIds,
MeasurementsList: measurements,
Timestamps: timestamps,
IsAligned: &isAligned,
}
v := make([][]byte, length)
for i := 0; i < len(measurements); i++ {
if bys, err := valuesToBytes(dataTypes[i], values[i]); err == nil {
v[i] = bys
} else {
return nil, err
}
}
request.ValuesList = v
return &request, nil
}
func valuesToBytes(dataTypes []TSDataType, values []interface{}) ([]byte, error) {
buff := &bytes.Buffer{}
for i, t := range dataTypes {
binary.Write(buff, binary.BigEndian, byte(t))
v := values[i]
if v == nil {
return nil, fmt.Errorf("values[%d] can't be nil", i)
}
switch t {
case BOOLEAN:
switch v.(type) {
case bool:
binary.Write(buff, binary.BigEndian, v)
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be bool", i, v, reflect.TypeOf(v))
}
case INT32:
switch v.(type) {
case int32:
binary.Write(buff, binary.BigEndian, v)
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be int32", i, v, reflect.TypeOf(v))
}
case INT64, TIMESTAMP:
switch v.(type) {
case int64:
binary.Write(buff, binary.BigEndian, v)
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be int64", i, v, reflect.TypeOf(v))
}
case FLOAT:
switch v.(type) {
case float32:
binary.Write(buff, binary.BigEndian, v)
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be float32", i, v, reflect.TypeOf(v))
}
case DOUBLE:
switch v.(type) {
case float64:
binary.Write(buff, binary.BigEndian, v)
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be float64", i, v, reflect.TypeOf(v))
}
case TEXT, STRING:
switch s := v.(type) {
case string:
size := len(s)
binary.Write(buff, binary.BigEndian, int32(size))
binary.Write(buff, binary.BigEndian, []byte(s))
case []byte:
size := len(s)
binary.Write(buff, binary.BigEndian, int32(size))
binary.Write(buff, binary.BigEndian, s)
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be string or []byte", i, v, reflect.TypeOf(v))
}
case BLOB:
switch s := v.(type) {
case []byte:
size := len(s)
binary.Write(buff, binary.BigEndian, int32(size))
binary.Write(buff, binary.BigEndian, s)
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be []byte", i, v, reflect.TypeOf(v))
}
case DATE:
switch s := v.(type) {
case time.Time:
date, err := DateToInt32(s)
if err != nil {
return nil, err
}
binary.Write(buff, binary.BigEndian, date)
default:
return nil, fmt.Errorf("values[%d] %v(%v) must be time.Time", i, v, reflect.TypeOf(v))
}
default:
return nil, fmt.Errorf("types[%d] is incorrect, it must in (BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, TIMESTAMP, BLOB, DATE, STRING)", i)
}
}
return buff.Bytes(), nil
}
func (s *Session) insertRelationalTablet(tablet *Tablet) (r *common.TSStatus, err error) {
if tablet.Len() == 0 {
return &common.TSStatus{Code: SuccessStatus}, nil
}
request, err := s.genTSInsertTabletReq(tablet, true, true)
if err != nil {
return nil, err
}
request.ColumnCategories = tablet.getColumnCategories()
r, err = s.client.InsertTablet(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.InsertTablet(context.Background(), request)
}
}
return r, err
}
func (s *Session) InsertTablet(tablet *Tablet, sorted bool) (r *common.TSStatus, err error) {
if !sorted {
if err := tablet.Sort(); err != nil {
return nil, err
}
}
request, err := s.genTSInsertTabletReq(tablet, false, false)
if err != nil {
return nil, err
}
r, err = s.client.InsertTablet(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.InsertTablet(context.Background(), request)
}
}
return r, err
}
func (s *Session) InsertAlignedTablet(tablet *Tablet, sorted bool) (r *common.TSStatus, err error) {
if !sorted {
if err := tablet.Sort(); err != nil {
return nil, err
}
}
request, err := s.genTSInsertTabletReq(tablet, true, false)
if err != nil {
return nil, err
}
r, err = s.client.InsertTablet(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.InsertTablet(context.Background(), request)
}
}
return r, err
}
func (s *Session) genTSInsertTabletReq(tablet *Tablet, isAligned bool, writeToTable bool) (*rpc.TSInsertTabletReq, error) {
if values, err := tablet.getValuesBytes(); err == nil {
request := &rpc.TSInsertTabletReq{
SessionId: s.sessionId,
PrefixPath: tablet.insertTargetName,
Measurements: tablet.GetMeasurements(),
Values: values,
Timestamps: tablet.GetTimestampBytes(),
Types: tablet.getDataTypes(),
Size: int32(tablet.RowSize),
IsAligned: &isAligned,
WriteToTable: &writeToTable,
}
return request, nil
} else {
return nil, err
}
}
func (s *Session) GetSessionId() int64 {
return s.sessionId
}
func NewSession(config *Config) Session {
config.sqlDialect = TreeSqlDialect
return newSessionWithSpecifiedSqlDialect(config)
}
func newSessionWithSpecifiedSqlDialect(config *Config) Session {
endPointList := []endPoint{{
Host: config.Host,
Port: config.Port,
}}
return Session{
config: config,
endPointList: endPointList,
}
}
func NewClusterSession(clusterConfig *ClusterConfig) (Session, error) {
clusterConfig.sqlDialect = TreeSqlDialect
return newClusterSessionWithSqlDialect(clusterConfig)
}
func newClusterSessionWithSqlDialect(clusterConfig *ClusterConfig) (Session, error) {
session := Session{}
session.endPointList = make([]endPoint, len(clusterConfig.NodeUrls))
for i := 0; i < len(clusterConfig.NodeUrls); i++ {
node := endPoint{}
node.Host = strings.Split(clusterConfig.NodeUrls[i], ":")[0]
node.Port = strings.Split(clusterConfig.NodeUrls[i], ":")[1]
session.endPointList[i] = node
}
var err error
for i := range session.endPointList {
ep := session.endPointList[i]
session.trans = thrift.NewTSocketConf(net.JoinHostPort(ep.Host, ep.Port), &thrift.TConfiguration{
ConnectTimeout: time.Duration(0), // Use 0 for no timeout
})
// session.trans = thrift.NewTFramedTransport(session.trans) // deprecated
tmp_conf := thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE}
session.trans = thrift.NewTFramedTransportConf(session.trans, &tmp_conf)
if !session.trans.IsOpen() {
err = session.trans.Open()
if err != nil {
log.Println(err)
} else {
session.config = getConfig(ep.Host, ep.Port,
clusterConfig.UserName, clusterConfig.Password, clusterConfig.FetchSize, clusterConfig.TimeZone, clusterConfig.ConnectRetryMax, clusterConfig.Database, clusterConfig.sqlDialect)
break
}
}
}
if !session.trans.IsOpen() {
return session, fmt.Errorf("no server can connect")
}
return session, nil
}
func (s *Session) initClusterConn(node endPoint) error {
var err error
s.trans = thrift.NewTSocketConf(net.JoinHostPort(node.Host, node.Port), &thrift.TConfiguration{
ConnectTimeout: time.Duration(0), // Use 0 for no timeout
})
if err == nil {
// s.trans = thrift.NewTFramedTransport(s.trans) // deprecated
tmp_conf := thrift.TConfiguration{MaxFrameSize: thrift.DEFAULT_MAX_FRAME_SIZE}
s.trans = thrift.NewTFramedTransportConf(s.trans, &tmp_conf)
if !s.trans.IsOpen() {
err = s.trans.Open()
if err != nil {
return err
}
}
}
if s.config.FetchSize < 1 {
s.config.FetchSize = DefaultFetchSize
}
if s.config.TimeZone == "" {
s.config.TimeZone = DefaultTimeZone
}
if s.config.ConnectRetryMax < 1 {
s.config.ConnectRetryMax = DefaultConnectRetryMax
}
iprot := s.protocolFactory.GetProtocol(s.trans)
oprot := s.protocolFactory.GetProtocol(s.trans)
s.client = rpc.NewIClientRPCServiceClient(thrift.NewTStandardClient(iprot, oprot))
req := rpc.TSOpenSessionReq{
ClientProtocol: rpc.TSProtocolVersion_IOTDB_SERVICE_PROTOCOL_V3, ZoneId: s.config.TimeZone, Username: s.config.UserName,
Password: &s.config.Password,
}
resp, err := s.client.OpenSession(context.Background(), &req)
if err != nil {
return err
}
s.sessionId = resp.GetSessionId()
s.requestStatementId, err = s.client.RequestStatementId(context.Background(), s.sessionId)
return err
}
func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string, connectRetryMax int, database string, sqlDialect string) *Config {
return &Config{
Host: host,
Port: port,
UserName: userName,
Password: passWord,
FetchSize: fetchSize,
TimeZone: timeZone,
ConnectRetryMax: connectRetryMax,
sqlDialect: sqlDialect,
Database: database,
}
}
func (s *Session) reconnect() bool {
var err error
connectedSuccess := false
for i := 0; i < s.config.ConnectRetryMax; i++ {
for i := range s.endPointList {
ep := s.endPointList[i]
err = s.initClusterConn(ep)
if err == nil {
connectedSuccess = true
break
} else {
log.Println("Connection refused:", ep)
}
}
if connectedSuccess {
break
}
}
return connectedSuccess
}
func (s *Session) SetFetchSize(fetchSize int32) {
s.config.FetchSize = fetchSize
}