client/column_decoder.go (188 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package client import ( "bytes" "encoding/binary" "fmt" ) type ColumnDecoder interface { ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) } func deserializeNullIndicators(reader *bytes.Reader, positionCount int32) ([]bool, error) { b, err := reader.ReadByte() if err != nil { return nil, err } mayHaveNull := b != 0 if !mayHaveNull { return nil, nil } return deserializeBooleanArray(reader, positionCount) } func deserializeBooleanArray(reader *bytes.Reader, size int32) ([]bool, error) { packedSize := (size + 7) / 8 packedBytes := make([]byte, packedSize) _, err := reader.Read(packedBytes) if err != nil { return nil, err } // read null bits 8 at a time output := make([]bool, size) currentByte := 0 fullGroups := int(size) & ^0b111 for pos := 0; pos < fullGroups; pos += 8 { b := packedBytes[currentByte] currentByte++ output[pos+0] = (b & 0b10000000) != 0 output[pos+1] = (b & 0b01000000) != 0 output[pos+2] = (b & 0b00100000) != 0 output[pos+3] = (b & 0b00010000) != 0 output[pos+4] = (b & 0b00001000) != 0 output[pos+5] = (b & 0b00000100) != 0 output[pos+6] = (b & 0b00000010) != 0 output[pos+7] = (b & 0b00000001) != 0 } // read last null bits if remaining := int(size) % 8; remaining > 0 { b := packedBytes[len(packedBytes)-1] mask := uint8(0b10000000) for pos := fullGroups; pos < int(size); pos++ { output[pos] = (b & mask) != 0 mask >>= 1 } } return output, nil } type baseColumnDecoder struct{} type Int32ArrayColumnDecoder struct { baseColumnDecoder } func (decoder *Int32ArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) { // Serialized data layout: // +---------------+-----------------+-------------+ // | may have null | null indicators | values | // +---------------+-----------------+-------------+ // | byte | list[byte] | list[int32] | // +---------------+-----------------+-------------+ nullIndicators, err := deserializeNullIndicators(reader, positionCount) if err != nil { return nil, err } switch dataType { case INT32, DATE: intValues := make([]int32, positionCount) for i := int32(0); i < positionCount; i++ { if nullIndicators != nil && nullIndicators[i] { continue } err := binary.Read(reader, binary.BigEndian, &intValues[i]) if err != nil { return nil, err } } return NewIntColumn(0, positionCount, nullIndicators, intValues) case FLOAT: floatValues := make([]float32, positionCount) for i := int32(0); i < positionCount; i++ { if nullIndicators != nil && nullIndicators[i] { continue } err := binary.Read(reader, binary.BigEndian, &floatValues[i]) if err != nil { return nil, err } } return NewFloatColumn(0, positionCount, nullIndicators, floatValues) } return nil, fmt.Errorf("invalid data type: %v", dataType) } type Int64ArrayColumnDecoder struct { baseColumnDecoder } func (decoder *Int64ArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) { // Serialized data layout: // +---------------+-----------------+-------------+ // | may have null | null indicators | values | // +---------------+-----------------+-------------+ // | byte | list[byte] | list[int64] | // +---------------+-----------------+-------------+ nullIndicators, err := deserializeNullIndicators(reader, positionCount) if err != nil { return nil, err } switch dataType { case INT64, TIMESTAMP: values := make([]int64, positionCount) for i := int32(0); i < positionCount; i++ { if nullIndicators != nil && nullIndicators[i] { continue } if err = binary.Read(reader, binary.BigEndian, &values[i]); err != nil { return nil, err } } return NewLongColumn(0, positionCount, nullIndicators, values) case DOUBLE: values := make([]float64, positionCount) for i := int32(0); i < positionCount; i++ { if nullIndicators != nil && nullIndicators[i] { continue } if err = binary.Read(reader, binary.BigEndian, &values[i]); err != nil { return nil, err } } return NewDoubleColumn(0, positionCount, nullIndicators, values) } return nil, fmt.Errorf("invalid data type: %v", dataType) } type ByteArrayColumnDecoder struct { baseColumnDecoder } func (decoder *ByteArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) { // Serialized data layout: // +---------------+-----------------+-------------+ // | may have null | null indicators | values | // +---------------+-----------------+-------------+ // | byte | list[byte] | list[byte] | // +---------------+-----------------+-------------+ if dataType != BOOLEAN { return nil, fmt.Errorf("invalid data type: %v", dataType) } nullIndicators, err := deserializeNullIndicators(reader, positionCount) if err != nil { return nil, err } values, err := deserializeBooleanArray(reader, positionCount) if err != nil { return nil, err } return NewBooleanColumn(0, positionCount, nullIndicators, values) } type BinaryArrayColumnDecoder struct { baseColumnDecoder } func (decoder *BinaryArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) { // Serialized data layout: // +---------------+-----------------+-------------+ // | may have null | null indicators | values | // +---------------+-----------------+-------------+ // | byte | list[byte] | list[entry] | // +---------------+-----------------+-------------+ // // Each entry is represented as: // +---------------+-------+ // | value length | value | // +---------------+-------+ // | int32 | bytes | // +---------------+-------+ if TEXT != dataType { return nil, fmt.Errorf("invalid data type: %v", dataType) } nullIndicators, err := deserializeNullIndicators(reader, positionCount) if err != nil { return nil, err } values := make([]*Binary, positionCount) for i := int32(0); i < positionCount; i++ { if nullIndicators != nil && nullIndicators[i] { continue } var length int32 err := binary.Read(reader, binary.BigEndian, &length) if err != nil { return nil, err } value := make([]byte, length) _, err = reader.Read(value) if err != nil { return nil, err } values[i] = NewBinary(value) } return NewBinaryColumn(0, positionCount, nullIndicators, values) } type RunLengthColumnDecoder struct { baseColumnDecoder } func (decoder *RunLengthColumnDecoder) ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount int32) (Column, error) { // Serialized data layout: // +-----------+-------------------------+ // | encoding | serialized inner column | // +-----------+-------------------------+ // | byte | list[byte] | // +-----------+-------------------------+ columnEncoding, err := deserializeColumnEncoding(reader) if err != nil { return nil, err } columnDecoder, err := getColumnDecoder(columnEncoding) if err != nil { return nil, err } column, err := columnDecoder.ReadColumn(reader, dataType, 1) if err != nil { return nil, err } return NewRunLengthEncodedColumn(column, positionCount) }