client/rpcdataset.go (725 lines of code) (raw):
package client
import (
"context"
"fmt"
"github.com/apache/iotdb-client-go/common"
"github.com/apache/iotdb-client-go/rpc"
"strconv"
"time"
)
const startIndex = int32(2)
type IoTDBRpcDataSet struct {
sql string
isClosed bool
client *rpc.IClientRPCServiceClient
columnNameList []string // no deduplication
columnTypeList []string // no deduplication
columnOrdinalMap map[string]int32 // used because the server returns deduplicated columns
columnName2TsBlockColumnIndexMap map[string]int32
// column index -> TsBlock column index
columnIndex2TsBlockColumnIndexList []int32
dataTypeForTsBlockColumn []TSDataType
fetchSize int32
timeout *int64
hasCachedRecord bool
lastReadWasNull bool
columnSize int32
sessionId int64
queryId int64
statementId int64
time int64
ignoreTimestamp bool
// indicates that there is still more data in server side and we can call fetchResult to get more
moreData bool
queryResult [][]byte
curTsBlock *TsBlock
queryResultSize int32 // the length of queryResult
queryResultIndex int32 // the index of bytebuffer in queryResult
tsBlockSize int32 // the size of current tsBlock
tsBlockIndex int32 // the row index in current tsBlock
zoneId *time.Location
timeFormat string
timeFactor int32
timePrecision string
}
func NewIoTDBRpcDataSet(sql string, columnNameList []string, columnTypeList []string, columnNameIndex map[string]int32, ignoreTimestamp bool, moreData bool, queryId int64, statementId int64, client *rpc.IClientRPCServiceClient, sessionId int64, queryResult [][]byte, fetchSize int32, timeout *int64, zoneId string, timeFormat string, timeFactor int32, columnIndex2TsBlockColumnIndexList []int32) (rpcDataSet *IoTDBRpcDataSet, err error) {
ds := &IoTDBRpcDataSet{
sessionId: sessionId,
statementId: statementId,
ignoreTimestamp: ignoreTimestamp,
sql: sql,
queryId: queryId,
client: client,
fetchSize: fetchSize,
timeout: timeout,
moreData: moreData,
columnSize: int32(len(columnNameList)),
columnNameList: make([]string, 0, len(columnNameList)+1),
columnTypeList: make([]string, 0, len(columnTypeList)+1),
columnOrdinalMap: make(map[string]int32),
columnName2TsBlockColumnIndexMap: make(map[string]int32),
}
columnStartIndex := int32(1)
resultSetColumnSize := int32(len(columnNameList))
// newly generated or updated columnIndex2TsBlockColumnIndexList.size() may not be equal to
// columnNameList.size()
// so we need startIndexForColumnIndex2TsBlockColumnIndexList to adjust the mapping relation
startIndexForColumnIndex2TsBlockColumnIndexList := int32(0)
// for Time Column in tree model which should always be the first column and its index for
// TsBlockColumn is -1
if !ignoreTimestamp {
ds.columnNameList = append(ds.columnNameList, TimestampColumnName)
ds.columnTypeList = append(ds.columnTypeList, "INT64")
ds.columnName2TsBlockColumnIndexMap[TimestampColumnName] = -1
ds.columnOrdinalMap[TimestampColumnName] = 1
if columnIndex2TsBlockColumnIndexList != nil {
newColumnIndex2TsBlockColumnIndexList := make([]int32, 0, len(columnIndex2TsBlockColumnIndexList)+1)
newColumnIndex2TsBlockColumnIndexList = append(newColumnIndex2TsBlockColumnIndexList, -1)
newColumnIndex2TsBlockColumnIndexList = append(newColumnIndex2TsBlockColumnIndexList, columnIndex2TsBlockColumnIndexList...)
columnIndex2TsBlockColumnIndexList = newColumnIndex2TsBlockColumnIndexList
startIndexForColumnIndex2TsBlockColumnIndexList = 1
}
columnStartIndex += 1
resultSetColumnSize += 1
}
ds.columnNameList = append(ds.columnNameList, columnNameList...)
ds.columnTypeList = append(ds.columnTypeList, columnTypeList...)
if columnIndex2TsBlockColumnIndexList == nil {
columnIndex2TsBlockColumnIndexList = make([]int32, 0, resultSetColumnSize)
if !ignoreTimestamp {
startIndexForColumnIndex2TsBlockColumnIndexList = 1
columnIndex2TsBlockColumnIndexList = append(columnIndex2TsBlockColumnIndexList, -1)
}
for i := 0; i < len(columnNameList); i++ {
columnIndex2TsBlockColumnIndexList = append(columnIndex2TsBlockColumnIndexList, int32(i))
}
}
tsBlockColumnSize := int32(0)
for _, value := range columnIndex2TsBlockColumnIndexList {
if value > tsBlockColumnSize {
tsBlockColumnSize = value
}
}
tsBlockColumnSize += 1
ds.dataTypeForTsBlockColumn = make([]TSDataType, tsBlockColumnSize)
for i, columnName := range columnNameList {
tsBlockColumnIndex := columnIndex2TsBlockColumnIndexList[startIndexForColumnIndex2TsBlockColumnIndexList+int32(i)]
if tsBlockColumnIndex != -1 {
columnType, err := GetDataTypeByStr(columnTypeList[i])
if err != nil {
return nil, err
}
ds.dataTypeForTsBlockColumn[tsBlockColumnIndex] = columnType
}
if _, exists := ds.columnName2TsBlockColumnIndexMap[columnName]; !exists {
ds.columnOrdinalMap[columnName] = int32(i) + columnStartIndex
ds.columnName2TsBlockColumnIndexMap[columnName] = tsBlockColumnIndex
}
}
ds.queryResult = queryResult
if queryResult != nil {
ds.queryResultSize = int32(len(queryResult))
} else {
ds.queryResultSize = 0
}
ds.queryResultIndex = 0
ds.tsBlockSize = 0
ds.tsBlockIndex = -1
if ds.zoneId, err = time.LoadLocation(zoneId); err != nil {
return nil, err
}
ds.timeFormat = timeFormat
ds.timeFactor = timeFactor
if ds.timePrecision, err = getTimePrecision(timeFactor); err != nil {
return nil, err
}
if len(columnIndex2TsBlockColumnIndexList) != len(ds.columnNameList) {
return nil, fmt.Errorf("size of columnIndex2TsBlockColumnIndexList %v doesn't equal to size of columnNameList %v", len(columnIndex2TsBlockColumnIndexList), len(ds.columnNameList))
}
ds.columnIndex2TsBlockColumnIndexList = columnIndex2TsBlockColumnIndexList
return ds, nil
}
func (s *IoTDBRpcDataSet) Close() (err error) {
if s.isClosed {
return nil
}
closeRequest := &rpc.TSCloseOperationReq{
SessionId: s.sessionId,
StatementId: &s.statementId,
QueryId: &s.queryId,
}
var status *common.TSStatus
status, err = s.client.CloseOperation(context.Background(), closeRequest)
if err == nil {
err = VerifySuccess(status)
}
s.client = nil
s.isClosed = true
return err
}
func (s *IoTDBRpcDataSet) Next() (result bool, err error) {
if s.hasCachedBlock() {
s.lastReadWasNull = false
err = s.constructOneRow()
return true, err
}
if s.hasCachedByteBuffer() {
if err = s.constructOneTsBlock(); err != nil {
return false, err
}
err = s.constructOneRow()
return true, err
}
if s.moreData {
hasResultSet, err := s.fetchResults()
if err != nil {
return false, err
}
if hasResultSet && s.hasCachedByteBuffer() {
if err = s.constructOneTsBlock(); err != nil {
return false, err
}
err = s.constructOneRow()
return true, err
}
}
err = s.Close()
if err != nil {
return false, err
}
return false, nil
}
func (s *IoTDBRpcDataSet) fetchResults() (bool, error) {
if s.isClosed {
return false, fmt.Errorf("this data set is already closed")
}
req := rpc.TSFetchResultsReq{
SessionId: s.sessionId,
Statement: s.sql,
FetchSize: s.fetchSize,
QueryId: s.queryId,
IsAlign: true,
}
req.Timeout = s.timeout
resp, err := s.client.FetchResultsV2(context.Background(), &req)
if err != nil {
return false, err
}
if err = VerifySuccess(resp.Status); err != nil {
return false, err
}
if !resp.HasResultSet {
err = s.Close()
} else {
s.queryResult = resp.GetQueryResult_()
s.queryResultIndex = 0
if s.queryResult != nil {
s.queryResultSize = int32(len(s.queryResult))
} else {
s.queryResultSize = 0
}
s.tsBlockSize = 0
s.tsBlockIndex = -1
}
return resp.HasResultSet, err
}
func (s *IoTDBRpcDataSet) hasCachedBlock() bool {
return s.curTsBlock != nil && s.tsBlockIndex < s.tsBlockSize-1
}
func (s *IoTDBRpcDataSet) hasCachedByteBuffer() bool {
return s.queryResult != nil && s.queryResultIndex < s.queryResultSize
}
func (s *IoTDBRpcDataSet) constructOneRow() (err error) {
s.tsBlockIndex++
s.hasCachedRecord = true
s.time, err = s.curTsBlock.GetTimeColumn().GetLong(s.tsBlockIndex)
return err
}
func (s *IoTDBRpcDataSet) constructOneTsBlock() (err error) {
s.lastReadWasNull = false
curTsBlockBytes := s.queryResult[s.queryResultIndex]
s.queryResultIndex = s.queryResultIndex + 1
s.curTsBlock, err = DeserializeTsBlock(curTsBlockBytes)
if err != nil {
return err
}
s.tsBlockIndex = -1
s.tsBlockSize = s.curTsBlock.GetPositionCount()
return nil
}
func (s *IoTDBRpcDataSet) isNullByIndex(columnIndex int32) (bool, error) {
if index, err := s.getTsBlockColumnIndexForColumnIndex(columnIndex); err != nil {
return false, err
} else {
return s.isNull(index, s.tsBlockIndex), nil
}
}
func (s *IoTDBRpcDataSet) isNullByColumnName(columnName string) (bool, error) {
if index, err := s.getTsBlockColumnIndexForColumnName(columnName); err != nil {
return false, err
} else {
return s.isNull(index, s.tsBlockIndex), nil
}
}
func (s *IoTDBRpcDataSet) isNull(index int32, rowNum int32) bool {
// -1 for time column which will never be null
return index >= 0 && s.curTsBlock.GetColumn(index).IsNull(rowNum)
}
func (s *IoTDBRpcDataSet) getBooleanByIndex(columnIndex int32) (bool, error) {
if index, err := s.getTsBlockColumnIndexForColumnIndex(columnIndex); err != nil {
return false, err
} else {
return s.getBooleanByTsBlockColumnIndex(index)
}
}
func (s *IoTDBRpcDataSet) getBoolean(columnName string) (bool, error) {
if index, err := s.getTsBlockColumnIndexForColumnName(columnName); err != nil {
return false, err
} else {
return s.getBooleanByTsBlockColumnIndex(index)
}
}
func (s *IoTDBRpcDataSet) getBooleanByTsBlockColumnIndex(tsBlockColumnIndex int32) (bool, error) {
if err := s.checkRecord(); err != nil {
return false, err
}
if !s.isNull(tsBlockColumnIndex, s.tsBlockIndex) {
s.lastReadWasNull = false
return s.curTsBlock.GetColumn(tsBlockColumnIndex).GetBoolean(s.tsBlockIndex)
} else {
s.lastReadWasNull = true
return false, nil
}
}
func (s *IoTDBRpcDataSet) getDoubleByIndex(columnIndex int32) (float64, error) {
if index, err := s.getTsBlockColumnIndexForColumnIndex(columnIndex); err != nil {
return 0, err
} else {
return s.getDoubleByTsBlockColumnIndex(index)
}
}
func (s *IoTDBRpcDataSet) getDouble(columnName string) (float64, error) {
if index, err := s.getTsBlockColumnIndexForColumnName(columnName); err != nil {
return 0, err
} else {
return s.getDoubleByTsBlockColumnIndex(index)
}
}
func (s *IoTDBRpcDataSet) getDoubleByTsBlockColumnIndex(tsBlockColumnIndex int32) (float64, error) {
if err := s.checkRecord(); err != nil {
return 0, err
}
if !s.isNull(tsBlockColumnIndex, s.tsBlockIndex) {
s.lastReadWasNull = false
return s.curTsBlock.GetColumn(tsBlockColumnIndex).GetDouble(s.tsBlockIndex)
} else {
s.lastReadWasNull = true
return 0, nil
}
}
func (s *IoTDBRpcDataSet) getFloatByIndex(columnIndex int32) (float32, error) {
if index, err := s.getTsBlockColumnIndexForColumnIndex(columnIndex); err != nil {
return 0, err
} else {
return s.getFloatByTsBlockColumnIndex(index)
}
}
func (s *IoTDBRpcDataSet) getFloat(columnName string) (float32, error) {
if index, err := s.getTsBlockColumnIndexForColumnName(columnName); err != nil {
return 0, err
} else {
return s.getFloatByTsBlockColumnIndex(index)
}
}
func (s *IoTDBRpcDataSet) getFloatByTsBlockColumnIndex(tsBlockColumnIndex int32) (float32, error) {
if err := s.checkRecord(); err != nil {
return 0, err
}
if !s.isNull(tsBlockColumnIndex, s.tsBlockIndex) {
s.lastReadWasNull = false
return s.curTsBlock.GetColumn(tsBlockColumnIndex).GetFloat(s.tsBlockIndex)
} else {
s.lastReadWasNull = true
return 0, nil
}
}
func (s *IoTDBRpcDataSet) getIntByIndex(columnIndex int32) (int32, error) {
if index, err := s.getTsBlockColumnIndexForColumnIndex(columnIndex); err != nil {
return 0, err
} else {
return s.getIntByTsBlockColumnIndex(index)
}
}
func (s *IoTDBRpcDataSet) getInt(columnName string) (int32, error) {
if index, err := s.getTsBlockColumnIndexForColumnName(columnName); err != nil {
return 0, err
} else {
return s.getIntByTsBlockColumnIndex(index)
}
}
func (s *IoTDBRpcDataSet) getIntByTsBlockColumnIndex(tsBlockColumnIndex int32) (int32, error) {
if err := s.checkRecord(); err != nil {
return 0, err
}
if !s.isNull(tsBlockColumnIndex, s.tsBlockIndex) {
s.lastReadWasNull = false
dataType := s.curTsBlock.GetColumn(tsBlockColumnIndex).GetDataType()
if dataType == INT64 {
if v, err := s.curTsBlock.GetColumn(tsBlockColumnIndex).GetLong(s.tsBlockIndex); err != nil {
return 0, err
} else {
return int32(v), nil
}
}
return s.curTsBlock.GetColumn(tsBlockColumnIndex).GetInt(s.tsBlockIndex)
} else {
s.lastReadWasNull = true
return 0, nil
}
}
func (s *IoTDBRpcDataSet) getLongByIndex(columnIndex int32) (int64, error) {
if index, err := s.getTsBlockColumnIndexForColumnIndex(columnIndex); err != nil {
return 0, err
} else {
return s.getLongByTsBlockColumnIndex(index)
}
}
func (s *IoTDBRpcDataSet) getLong(columnName string) (int64, error) {
if index, err := s.getTsBlockColumnIndexForColumnName(columnName); err != nil {
return 0, err
} else {
return s.getLongByTsBlockColumnIndex(index)
}
}
func (s *IoTDBRpcDataSet) getLongByTsBlockColumnIndex(tsBlockColumnIndex int32) (int64, error) {
if err := s.checkRecord(); err != nil {
return 0, err
}
if tsBlockColumnIndex < 0 {
s.lastReadWasNull = false
return s.curTsBlock.GetTimeByIndex(s.tsBlockIndex)
}
if !s.isNull(tsBlockColumnIndex, s.tsBlockIndex) {
s.lastReadWasNull = false
dataType := s.curTsBlock.GetColumn(tsBlockColumnIndex).GetDataType()
if dataType == INT32 {
if v, err := s.curTsBlock.GetColumn(tsBlockColumnIndex).GetInt(s.tsBlockIndex); err != nil {
return 0, err
} else {
return int64(v), nil
}
}
return s.curTsBlock.GetColumn(tsBlockColumnIndex).GetLong(s.tsBlockIndex)
} else {
s.lastReadWasNull = true
return 0, nil
}
}
func (s *IoTDBRpcDataSet) getBinaryByIndex(columnIndex int32) (*Binary, error) {
if index, err := s.getTsBlockColumnIndexForColumnIndex(columnIndex); err != nil {
return nil, err
} else {
return s.getBinaryByTsBlockColumnIndex(index)
}
}
func (s *IoTDBRpcDataSet) getBinary(columnName string) (*Binary, error) {
if index, err := s.getTsBlockColumnIndexForColumnName(columnName); err != nil {
return nil, err
} else {
return s.getBinaryByTsBlockColumnIndex(index)
}
}
func (s *IoTDBRpcDataSet) getBinaryByTsBlockColumnIndex(tsBlockColumnIndex int32) (*Binary, error) {
if err := s.checkRecord(); err != nil {
return nil, err
}
if !s.isNull(tsBlockColumnIndex, s.tsBlockIndex) {
s.lastReadWasNull = false
return s.curTsBlock.GetColumn(tsBlockColumnIndex).GetBinary(s.tsBlockIndex)
} else {
s.lastReadWasNull = true
return nil, nil
}
}
func (s *IoTDBRpcDataSet) getObjectByIndex(columnIndex int32) (interface{}, error) {
if index, err := s.getTsBlockColumnIndexForColumnIndex(columnIndex); err != nil {
return nil, err
} else {
return s.getObjectByTsBlockIndex(index)
}
}
func (s *IoTDBRpcDataSet) getObject(columnName string) (interface{}, error) {
if index, err := s.getTsBlockColumnIndexForColumnName(columnName); err != nil {
return nil, err
} else {
return s.getObjectByTsBlockIndex(index)
}
}
func (s *IoTDBRpcDataSet) getObjectByTsBlockIndex(tsBlockColumnIndex int32) (interface{}, error) {
if err := s.checkRecord(); err != nil {
return nil, err
}
if s.isNull(tsBlockColumnIndex, s.tsBlockIndex) {
s.lastReadWasNull = true
return nil, nil
}
s.lastReadWasNull = false
dataType := s.getDataTypeByTsBlockColumnIndex(tsBlockColumnIndex)
switch dataType {
case BOOLEAN, INT32, INT64, FLOAT, DOUBLE:
return s.curTsBlock.GetColumn(tsBlockColumnIndex).GetObject(s.tsBlockIndex)
case TIMESTAMP:
var timestamp int64
var err error
if tsBlockColumnIndex == -1 {
timestamp, err = s.curTsBlock.GetTimeByIndex(s.tsBlockIndex)
} else {
timestamp, err = s.curTsBlock.GetColumn(tsBlockColumnIndex).GetLong(s.tsBlockIndex)
}
if err != nil {
return nil, err
}
return convertToTimestamp(timestamp, s.timeFactor), nil
case TEXT, STRING:
if binary, err := s.curTsBlock.GetColumn(tsBlockColumnIndex).GetBinary(s.tsBlockIndex); err != nil {
return nil, err
} else {
return binary.GetStringValue(), nil
}
case BLOB:
if binary, err := s.curTsBlock.GetColumn(tsBlockColumnIndex).GetBinary(s.tsBlockIndex); err != nil {
return nil, err
} else {
return binary.GetValues(), nil
}
case DATE:
if value, err := s.curTsBlock.GetColumn(tsBlockColumnIndex).GetInt(s.tsBlockIndex); err != nil {
return nil, err
} else {
return Int32ToDate(value)
}
default:
return nil, nil
}
}
func (s *IoTDBRpcDataSet) getStringByIndex(columnIndex int32) (string, error) {
columnIndex, err := s.getTsBlockColumnIndexForColumnIndex(columnIndex)
if err != nil {
return "", err
}
return s.getStringByTsBlockColumnIndex(columnIndex)
}
func (s *IoTDBRpcDataSet) getString(columnName string) (string, error) {
columnIndex, err := s.getTsBlockColumnIndexForColumnName(columnName)
if err != nil {
return "", err
}
return s.getStringByTsBlockColumnIndex(columnIndex)
}
func (s *IoTDBRpcDataSet) getStringByTsBlockColumnIndex(tsBlockColumnIndex int32) (string, error) {
if err := s.checkRecord(); err != nil {
return "", err
}
// to keep compatibility, tree model should return a long value for time column
if tsBlockColumnIndex == -1 {
timestamp, err := s.curTsBlock.GetTimeByIndex(s.tsBlockIndex)
if err != nil {
return "", err
}
return int64ToString(timestamp), nil
}
if s.isNull(tsBlockColumnIndex, s.tsBlockIndex) {
s.lastReadWasNull = true
return "", nil
}
s.lastReadWasNull = false
return s.getStringByTsBlockColumnIndexAndDataType(tsBlockColumnIndex, s.getDataTypeByTsBlockColumnIndex(tsBlockColumnIndex))
}
func (s *IoTDBRpcDataSet) getStringByTsBlockColumnIndexAndDataType(index int32, tsDataType TSDataType) (string, error) {
switch tsDataType {
case BOOLEAN:
if v, err := s.curTsBlock.GetColumn(index).GetBoolean(s.tsBlockIndex); err != nil {
return "", nil
} else {
return strconv.FormatBool(v), nil
}
case INT32:
if v, err := s.curTsBlock.GetColumn(index).GetInt(s.tsBlockIndex); err != nil {
return "", err
} else {
return int32ToString(v), nil
}
case INT64:
if v, err := s.curTsBlock.GetColumn(index).GetLong(s.tsBlockIndex); err != nil {
return "", err
} else {
return int64ToString(v), nil
}
case TIMESTAMP:
if v, err := s.curTsBlock.GetColumn(index).GetLong(s.tsBlockIndex); err != nil {
return "", err
} else {
return formatDatetime(DEFAULT_TIME_FORMAT, s.timePrecision, v, s.zoneId), nil
}
case FLOAT:
if v, err := s.curTsBlock.GetColumn(index).GetFloat(s.tsBlockIndex); err != nil {
return "", err
} else {
return float32ToString(v), nil
}
case DOUBLE:
if v, err := s.curTsBlock.GetColumn(index).GetDouble(s.tsBlockIndex); err != nil {
return "", err
} else {
return float64ToString(v), nil
}
case TEXT, STRING:
if v, err := s.curTsBlock.GetColumn(index).GetBinary(s.tsBlockIndex); err != nil {
return "", err
} else {
return v.GetStringValue(), nil
}
case BLOB:
if v, err := s.curTsBlock.GetColumn(index).GetBinary(s.tsBlockIndex); err != nil {
return "", err
} else {
return bytesToHexString(v.values), nil
}
case DATE:
v, err := s.curTsBlock.GetColumn(index).GetInt(s.tsBlockIndex)
if err != nil {
return "", err
}
t, err := Int32ToDate(v)
if err != nil {
return "", err
}
return t.Format("2006-01-02"), nil
}
return "", nil
}
func (s *IoTDBRpcDataSet) getTimestampByIndex(columnIndex int32) (time.Time, error) {
if index, err := s.getTsBlockColumnIndexForColumnIndex(columnIndex); err != nil {
return time.Time{}, err
} else {
return s.getTimestampByTsBlockColumnIndex(index)
}
}
func (s *IoTDBRpcDataSet) getTimestamp(columnName string) (time.Time, error) {
if index, err := s.getTsBlockColumnIndexForColumnName(columnName); err != nil {
return time.Time{}, err
} else {
return s.getTimestampByTsBlockColumnIndex(index)
}
}
func (s *IoTDBRpcDataSet) getTimestampByTsBlockColumnIndex(tsBlockColumnIndex int32) (time.Time, error) {
if value, err := s.getLongByTsBlockColumnIndex(tsBlockColumnIndex); err != nil {
return time.Time{}, err
} else {
return convertToTimestamp(value, s.timeFactor), nil
}
}
func (s *IoTDBRpcDataSet) GetDateByIndex(columnIndex int32) (time.Time, error) {
if index, err := s.getTsBlockColumnIndexForColumnIndex(columnIndex); err != nil {
return time.Time{}, err
} else {
return s.GetDateByTsBlockColumnIndex(index)
}
}
func (s *IoTDBRpcDataSet) GetDate(columnName string) (time.Time, error) {
if index, err := s.getTsBlockColumnIndexForColumnName(columnName); err != nil {
return time.Time{}, err
} else {
return s.GetDateByTsBlockColumnIndex(index)
}
}
func (s *IoTDBRpcDataSet) GetDateByTsBlockColumnIndex(tsBlockColumnIndex int32) (time.Time, error) {
if value, err := s.getIntByTsBlockColumnIndex(tsBlockColumnIndex); err != nil {
return time.Time{}, err
} else {
return Int32ToDate(value)
}
}
func (s *IoTDBRpcDataSet) getDataTypeByIndex(columnIndex int32) (TSDataType, error) {
if index, err := s.getTsBlockColumnIndexForColumnIndex(columnIndex); err != nil {
return UNKNOWN, err
} else {
return s.getDataTypeByTsBlockColumnIndex(index), nil
}
}
func (s *IoTDBRpcDataSet) getDataType(columnName string) (TSDataType, error) {
if index, err := s.getTsBlockColumnIndexForColumnName(columnName); err != nil {
return UNKNOWN, err
} else {
return s.getDataTypeByTsBlockColumnIndex(index), nil
}
}
func (s *IoTDBRpcDataSet) getDataTypeByTsBlockColumnIndex(tsBlockColumnIndex int32) TSDataType {
if tsBlockColumnIndex < 0 {
return TIMESTAMP
} else {
return s.dataTypeForTsBlockColumn[tsBlockColumnIndex]
}
}
func (s *IoTDBRpcDataSet) findColumn(columnName string) int32 {
return s.columnOrdinalMap[columnName]
}
func (s *IoTDBRpcDataSet) findColumnNameByIndex(columnIndex int32) (string, error) {
if columnIndex <= 0 {
return "", fmt.Errorf("column index should start from 1")
}
if columnIndex > int32(len(s.columnNameList)) {
return "", fmt.Errorf("column index %d out of range %d", columnIndex, len(s.columnNameList))
}
return s.columnNameList[columnIndex-1], nil
}
// return -1 for time column of tree model
func (s *IoTDBRpcDataSet) getTsBlockColumnIndexForColumnName(columnName string) (int32, error) {
if index, exists := s.columnName2TsBlockColumnIndexMap[columnName]; !exists {
return 0, fmt.Errorf("unknown column name: %v", columnName)
} else {
return index, nil
}
}
func (s *IoTDBRpcDataSet) getTsBlockColumnIndexForColumnIndex(columnIndex int32) (int32, error) {
if columnIndex-1 >= int32(len(s.columnIndex2TsBlockColumnIndexList)) || int32(columnIndex-1) < 0 {
return 0, fmt.Errorf("index %v out of range %v", columnIndex-1, len(s.columnIndex2TsBlockColumnIndexList))
}
return s.columnIndex2TsBlockColumnIndexList[columnIndex-1], nil
}
func (s *IoTDBRpcDataSet) checkRecord() (err error) {
if s.queryResultIndex > s.queryResultSize || s.tsBlockIndex >= s.tsBlockSize || s.queryResult == nil || s.curTsBlock == nil {
err = fmt.Errorf("no record remains")
}
return err
}
func (s *IoTDBRpcDataSet) GetValueColumnStartIndex() int32 {
if s.ignoreTimestamp {
return 0
} else {
return 1
}
}
func (s *IoTDBRpcDataSet) GetColumnSize() int32 {
return int32(len(s.columnNameList))
}
func (s *IoTDBRpcDataSet) GetColumnTypeList() []string {
return s.columnTypeList
}
func (s *IoTDBRpcDataSet) GetColumnNameTypeList() []string {
return s.columnTypeList
}
func (s *IoTDBRpcDataSet) IsClosed() bool {
return s.isClosed
}
func (s *IoTDBRpcDataSet) GetFetchSize() int32 {
return s.fetchSize
}
func (s *IoTDBRpcDataSet) SetFetchSize(fetchSize int32) {
s.fetchSize = fetchSize
}
func (s *IoTDBRpcDataSet) HasCachedRecord() bool {
return s.hasCachedRecord
}
func (s *IoTDBRpcDataSet) SetHasCachedRecord(hasCachedRecord bool) {
s.hasCachedRecord = hasCachedRecord
}
func (s *IoTDBRpcDataSet) IsLastReadWasNull() bool {
return s.lastReadWasNull
}
func (s *IoTDBRpcDataSet) GetCurrentRowTime() int64 {
return s.time
}
func (s *IoTDBRpcDataSet) IsIgnoreTimestamp() bool {
return s.ignoreTimestamp
}