memstore/vector_party.go (478 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memstore
import (
"fmt"
"github.com/uber/aresdb/cgoutils"
"github.com/uber/aresdb/memstore/common"
"github.com/uber/aresdb/memstore/vectors"
"github.com/uber/aresdb/utils"
"io"
"os"
"unsafe"
)
// TransferableVectorParty is vector party that can be transferred to gpu for processing
type TransferableVectorParty interface {
// GetHostVectorPartySlice slice vector party between [startIndex, startIndex+length) before transfer to gpu
GetHostVectorPartySlice(startIndex, length int) common.HostVectorPartySlice
}
// baseVectorParty is the base vector party type
type baseVectorParty struct {
// DataType of values. We need it since for mode 0 vector party, we cannot
// get data type from values vector. Also we store it on disk anyway.
dataType common.DataType
// Number of non-default values stored, not always the same as Nulls.numTrues.
nonDefaultValueCount int
// Length/Size of each vector (not necessarily number of records).
length int
// Following fields are initialized during struct initialization.
// DefaultValue for this column. For convenience.
defaultValue common.DataValue
}
// cVectorParty combines the value, null, and count vector of a column in a batch.
// Some of the vectors can be nil, following the same modes described at
// https://github.com/uber/aresdb/wiki/VectorStore#vector-party
type cVectorParty struct {
baseVectorParty
// Set during archiving/backfill and stored on disk.
columnMode common.ColumnMode
values *vectors.Vector
// Stores the validity bitmap (0 means null) for each value in values.
nulls *vectors.Vector
// Stores the accumulative count from the beginning of the vector
// to the current position. Its length is vp.Length + 1 with first value to
// be 0 and last value to be vp.Length. We can get a count of current value
// by Counts[i+1] - Counts[i] for Values[i]
counts *vectors.Vector
}
// IsList tells whether it's a list vector party or not.
func (vp *baseVectorParty) IsList() bool {
return false
}
// AsList returns ListVectorParty representation of this vector party.
// Caller should always call IsList before conversion, otherwise panic may happens
// for incompatible vps.
func (vp *baseVectorParty) AsList() common.ListVectorParty {
utils.GetLogger().Panic("Cannot convert this vp to list vp")
return nil
}
// GetLength returns the length this vector party
func (vp *baseVectorParty) GetLength() int {
return vp.length
}
// GetDataType returns the min and max value of this vector party
func (vp *baseVectorParty) GetDataType() common.DataType {
return vp.dataType
}
// GetNonDefaultValueCount get Number of non-default values stored
func (vp *baseVectorParty) GetNonDefaultValueCount() int {
return vp.nonDefaultValueCount
}
// GetMode returns the stored column mode of this vector party.
func (vp *cVectorParty) GetMode() common.ColumnMode {
return vp.columnMode
}
// fillWithDefaultValue fills the values vector and nulls vector with default value
// if it's valid. **It should be only called when both values vector and nulls vector
// presents, otherwise it will panic. Also it requires both value vector and null vector
// is initialized with zeros and have never been touched yet**
func (vp *cVectorParty) fillWithDefaultValue() {
if vp.values == nil || vp.nulls == nil {
utils.GetLogger().Panic("Calling FillWithDefaultValue with nil value vector" +
"or nil null vector")
}
if vp.defaultValue.Valid {
vp.nulls.SetAllValid()
if vp.dataType == common.Bool && vp.defaultValue.BoolVal {
vp.values.SetAllValid()
} else {
for i := 0; i < vp.values.Size; i++ {
vp.values.SetValue(i, vp.defaultValue.OtherVal)
}
}
}
}
// SafeDestruct destructs all vectors of this vector party. Corresponding pointer should be set
// as nil after destruction.
func (vp *cVectorParty) SafeDestruct() {
if vp != nil {
vp.values.SafeDestruct()
vp.values = nil
vp.nulls.SafeDestruct()
vp.nulls = nil
vp.counts.SafeDestruct()
vp.counts = nil
}
}
// GetBytes returns space occupied by this vector party.
func (vp *cVectorParty) GetBytes() int64 {
var bytes int64
if vp.values != nil {
bytes += int64(vp.values.Bytes)
}
if vp.nulls != nil {
bytes += int64(vp.nulls.Bytes)
}
if vp.counts != nil {
bytes += int64(vp.counts.Bytes)
}
return bytes
}
// setValidity set the validity of given offset and update NonDefaultValueCount.
// Third parameter count should only be passed for compressed columns. If
// not passed, the default value is 1.
func (vp *cVectorParty) setValidity(offset int, valid bool) {
vp.nulls.SetBool(offset, valid)
}
// GetValidity implements GetValidity in cVectorParty
func (vp *cVectorParty) GetValidity(offset int) bool {
if vp.columnMode == common.AllValuesDefault {
return vp.defaultValue.Valid
}
return vp.nulls == nil || vp.nulls.GetBool(offset)
}
// GetDataValue returns the DataValue for the specified index.
// It first check validity of the value, then it check whether it's a
// boolean column to decide whether to load bool value or other value
// type. Index bound is not checked!
func (vp *cVectorParty) GetDataValue(offset int) common.DataValue {
if vp.columnMode == common.AllValuesDefault {
return vp.defaultValue
}
val := common.DataValue{
Valid: vp.GetValidity(offset),
DataType: vp.dataType,
}
if !val.Valid {
return val
}
if vp.values.DataType == common.Bool {
val.IsBool = true
val.BoolVal = vp.values.GetBool(offset)
return val
}
val.OtherVal = vp.values.GetValue(offset)
val.CmpFunc = vp.values.CmpFunc
return val
}
// GetDataValueByRow implements GetDataValueByRow in cVectorParty
func (vp *cVectorParty) GetDataValueByRow(row int) common.DataValue {
offset := row
if vp.GetMode() == common.HasCountVector {
offset = vp.counts.UpperBound(0, vp.counts.Size, unsafe.Pointer(&row)) - 1
}
return vp.GetDataValue(offset)
}
// SetDataValue implements SetDataValue in cVectorParty
func (vp *cVectorParty) SetDataValue(offset int, value common.DataValue, countsUpdateMode common.ValueCountsUpdateMode, counts ...uint32) {
vp.setValidity(offset, value.Valid)
if value.Valid {
if vp.values.DataType == common.Bool {
vp.values.SetBool(offset, value.BoolVal)
} else {
vp.values.SetValue(offset, value.OtherVal)
}
} else {
if vp.values.DataType == common.Bool {
vp.values.SetBool(offset, false)
} else {
var zero [2]uint64
vp.values.SetValue(offset, unsafe.Pointer(&zero))
}
}
if countsUpdateMode == common.IgnoreCount {
return
}
isNonDefault := value.Compare(vp.defaultValue) != 0
count := 1
if len(counts) > 0 {
count = int(counts[0])
}
if countsUpdateMode == common.IncrementCount {
if isNonDefault {
vp.nonDefaultValueCount += count
}
} else if countsUpdateMode == common.CheckExistingCount {
existing := vp.GetDataValue(offset).Compare(vp.defaultValue) != 0
if !existing && isNonDefault {
vp.nonDefaultValueCount += count
} else if existing && !isNonDefault {
vp.nonDefaultValueCount -= count
}
}
}
// JudgeMode judges column mode of current vector party according to value count fields.
func (vp *cVectorParty) JudgeMode() common.ColumnMode {
if vp.nonDefaultValueCount == 0 {
// both
return common.AllValuesDefault
} else if vp.counts != nil {
// compressed columns.
return common.HasCountVector
} else if vp.nulls == nil || vp.nulls.CheckAllValid() {
// no null vector.
return common.AllValuesPresent
}
// uncompressed columns
return common.HasNullVector
}
// Equals checks whether two vector parties are the same. **Only for unit test use.**
func (vp *cVectorParty) Equals(other common.VectorParty) bool {
if vp == nil || other == nil {
return vp == nil && other == nil
}
var v2 *cVectorParty
switch v := other.(type) {
case *archiveVectorParty:
v2 = &v.cVectorParty
case *cLiveVectorParty:
v2 = &v.cVectorParty
case *cVectorParty:
v2 = v
default:
return false
}
if vp.length != v2.length {
return false
}
if vp.GetMode() != v2.GetMode() {
return false
}
// check vector elements
for i := 0; i < vp.length; i++ {
if vp.GetDataValue(i).Compare(v2.GetDataValue(i)) != 0 {
return false
}
if vp.counts != nil {
// compare first count
// usually this is not needed since first count should always be 0
if i == 0 {
if vp.counts.CmpFunc(vp.counts.GetValue(0), v2.counts.GetValue(0)) != 0 {
return false
}
}
// only compare next count
if vp.counts.CmpFunc(vp.counts.GetValue(i+1), v2.counts.GetValue(i+1)) != 0 {
return false
}
}
}
return true
}
// Slice slice the vector party into the interval of [startRow, startRow+numRows)
func (vp *cVectorParty) Slice(startRow int, numRows int) (vector common.SlicedVector) {
beginIndex := startRow
// size is the number of entries in the vector,
// size != numRows when compressed,
// although here size is initialized as if vector is uncompressed.
size := vp.length - beginIndex
if size < 0 {
size = 0
}
if size > numRows {
size = numRows
}
mode := vp.GetMode()
if mode == common.AllValuesDefault && size > 0 {
size = 1
} else if mode == common.HasCountVector {
// find the indexes [beginIndex, endIndex) based on [startRow, startRow + numRows)
lowerCount := uint32(startRow)
upperCount := uint32(startRow + numRows)
beginIndex = vp.counts.UpperBound(0, vp.counts.Size, unsafe.Pointer(&lowerCount)) - 1
endIndex := vp.counts.LowerBound(beginIndex, vp.counts.Size, unsafe.Pointer(&upperCount))
// subtract endIndex by 1 when endIndex points to vp.length+1
if endIndex == vp.counts.Size {
endIndex -= 1
}
size = endIndex - beginIndex
}
vector = common.SlicedVector{
Values: make([]interface{}, size),
Counts: make([]int, size),
}
for i := 0; i < size; i++ {
if mode == common.AllValuesDefault {
vector.Values[i] = vp.defaultValue.ConvertToHumanReadable(vp.dataType)
vector.Counts[i] = numRows
if vector.Counts[i] > vp.length {
vector.Counts[i] = vp.length
}
} else if mode == common.HasCountVector {
// compressed
vector.Values[i] = vp.GetDataValue(beginIndex + i).ConvertToHumanReadable(vp.dataType)
count := int(*(*uint32)(vp.counts.GetValue(beginIndex + i + 1))) - startRow
if count > numRows {
count = numRows
}
vector.Counts[i] = count
} else {
// uncompressed
vector.Values[i] = vp.GetDataValue(beginIndex + i).ConvertToHumanReadable(vp.dataType)
vector.Counts[i] = i + 1
}
}
return vector
}
// SliceByValue returns a subrange withing [lowerBoundRow, upperBoundRow) that matches the specified value
func (vp *cVectorParty) SliceByValue(lowerBoundRow, upperBoundRow int, value unsafe.Pointer) (
startRow int, endRow int, startIndex int, endIndex int) {
// has counts
if vp.GetMode() == common.AllValuesDefault {
// TODO: check whether value itself is null, for IS_NULL
// return as if the slice is empty [upperBound, upperBound)
if vp.defaultValue.Valid {
sliceValue := common.DataValue{
Valid: true,
DataType: vp.dataType,
}
if vp.defaultValue.IsBool {
sliceValue.IsBool = true
sliceValue.BoolVal = *(*uint32)(value) != 0
} else {
sliceValue.OtherVal = value
sliceValue.CmpFunc = common.GetCompareFunc(vp.dataType)
}
// If the default value is equal to the value, we return the whole slice.
if vp.defaultValue.Compare(sliceValue) == 0 {
return lowerBoundRow, upperBoundRow, lowerBoundRow, upperBoundRow
}
}
return upperBoundRow, upperBoundRow, upperBoundRow, upperBoundRow
} else if vp.GetMode() == common.HasCountVector {
startIndex := vp.counts.UpperBound(0, vp.counts.Size, unsafe.Pointer(&lowerBoundRow)) - 1
endIndex := vp.counts.LowerBound(startIndex, vp.counts.Size, unsafe.Pointer(&upperBoundRow))
// subtract endIndex by 1 when endIndex points to vp.length+1
if endIndex == vp.counts.Size {
endIndex -= 1
}
startIndex = vp.values.LowerBound(startIndex, endIndex, value)
endIndex = vp.values.UpperBound(startIndex, endIndex, value)
startRow = int(*(*uint32)(vp.counts.GetValue(startIndex)))
endRow = int(*(*uint32)(vp.counts.GetValue(endIndex)))
return startRow, endRow, startIndex, endIndex
} else {
startRow = vp.values.LowerBound(lowerBoundRow, upperBoundRow, value)
endRow = vp.values.UpperBound(lowerBoundRow, upperBoundRow, value)
return startRow, endRow, startRow, endRow
}
}
// SliceIndex returns the startIndex and endIndex of the vector party slice given startRow and endRow of original vector
// party.
func (vp *cVectorParty) SliceIndex(lowerBoundRow, upperBoundRow int) (startIndex, endIndex int) {
if vp.GetMode() == common.HasCountVector {
startIndex = vp.counts.UpperBound(0, vp.counts.Size, unsafe.Pointer(&lowerBoundRow)) - 1
endIndex = vp.counts.LowerBound(startIndex, vp.counts.Size, unsafe.Pointer(&upperBoundRow))
// subtract endIndex by 1 when endIndex points to vp.length+1
if endIndex == vp.counts.Size {
endIndex -= 1
}
return startIndex, endIndex
}
return lowerBoundRow, upperBoundRow
}
// Write writes a vector party to underlying writer. It first writes header and then writes vectors
// based on vector party mode. **This vector party should be from archive batch and already pruned.**
func (vp *cVectorParty) Write(writer io.Writer) error {
dataWriter := utils.NewStreamDataWriter(writer)
if err := dataWriter.WriteUint32(common.VectorPartyHeader); err != nil {
return err
}
if err := dataWriter.WriteInt32(int32(vp.length)); err != nil {
return err
}
if err := dataWriter.WriteUint32(uint32(vp.dataType)); err != nil {
return err
}
if err := dataWriter.WriteInt32(int32(vp.nonDefaultValueCount)); err != nil {
return err
}
columnMode := vp.columnMode
if err := dataWriter.WriteUint16(uint16(columnMode)); err != nil {
return err
}
// Write 6 bytes padding.
if err := dataWriter.SkipBytes(6); err != nil {
return err
}
// Starting writing vectors.
// Stop writing since there are no vectors in this vp.
if columnMode <= common.AllValuesDefault {
return nil
}
// Write value vector.
// Here we directly move data from c allocated memory into writer.
if err := dataWriter.Write(
cgoutils.MakeSliceFromCPtr(uintptr(vp.values.Buffer()), vp.values.Bytes),
); err != nil {
return err
}
// Stop writing since there are no more vectors in this vp.
if columnMode <= common.AllValuesPresent {
return nil
}
// Write null vector.
// Here we directly move data from c allocated memory into writer.
if err := dataWriter.Write(
cgoutils.MakeSliceFromCPtr(uintptr(vp.nulls.Buffer()), vp.nulls.Bytes),
); err != nil {
return err
}
// Stop writing since there are no more vectors in this vp.
if columnMode <= common.HasNullVector {
return nil
}
// Write count vector.
// Here we directly move data from c allocated memory into writer.
if err := dataWriter.Write(
cgoutils.MakeSliceFromCPtr(uintptr(vp.counts.Buffer()), vp.counts.Bytes),
); err != nil {
return err
}
return nil
}
// Read reads a vector party from underlying reader. It first reads header from the reader and does
// several sanity checks. Then it reads vectors based on vector party mode.
func (vp *cVectorParty) Read(reader io.Reader, s common.VectorPartySerializer) error {
dataReader := utils.NewStreamDataReader(reader)
magicNumber, err := dataReader.ReadUint32()
if err != nil {
return err
}
if magicNumber != common.VectorPartyHeader {
return utils.StackError(nil, "Magic number does not match, vector party file may be corrupted")
}
rawLength, err := dataReader.ReadInt32()
if err != nil {
return err
}
length := int(rawLength)
rawDataType, err := dataReader.ReadUint32()
if err != nil {
return err
}
dataType, err := common.NewDataType(rawDataType)
if err != nil {
return err
}
nonDefaultValueCount, err := dataReader.ReadInt32()
if err != nil {
return err
}
m, err := dataReader.ReadUint16()
if err != nil {
return err
}
columnMode := common.ColumnMode(m)
if columnMode >= common.MaxColumnMode {
return utils.StackError(nil, "Invalid mode %d", columnMode)
}
// Read unused bytes
err = dataReader.SkipBytes(6)
if err != nil {
return err
}
vp.length = length
vp.nonDefaultValueCount = int(nonDefaultValueCount)
vp.dataType = dataType
vp.columnMode = columnMode
if err = s.CheckVectorPartySerializable(vp); err != nil {
return err
}
bytes := vectors.CalculateVectorPartyBytes(vp.GetDataType(), vp.GetLength(),
columnMode == common.HasNullVector || columnMode == common.HasCountVector, columnMode == common.HasCountVector)
s.ReportVectorPartyMemoryUsage(int64(bytes))
// Stop reading since there are no vectors in this vp.
if columnMode <= common.AllValuesDefault {
return nil
}
// Read value vector.
valueVector := vectors.NewVector(dataType, length)
// Here we directly read from reader into the c allocated bytes.
if err = dataReader.Read(
cgoutils.MakeSliceFromCPtr(uintptr(valueVector.Buffer()), valueVector.Bytes),
); err != nil {
valueVector.SafeDestruct()
return err
}
vp.values = valueVector
// Stop reading since there are no more vectors in this vp.
if columnMode <= common.AllValuesPresent {
return nil
}
// Read null vector.
nullVector := vectors.NewVector(common.Bool, length)
// Here we directly read from reader into the c allocated bytes.
if err = dataReader.Read(
cgoutils.MakeSliceFromCPtr(uintptr(nullVector.Buffer()), nullVector.Bytes),
); err != nil {
valueVector.SafeDestruct()
nullVector.SafeDestruct()
return err
}
vp.nulls = nullVector
// Stop reading since there are no more vectors in this vp.
if columnMode <= common.HasNullVector {
return nil
}
// Read count vector.
countVector := vectors.NewVector(common.Uint32, length+1)
// Here we directly read from reader into the c allocated bytes.
if err = dataReader.Read(
cgoutils.MakeSliceFromCPtr(uintptr(countVector.Buffer()), countVector.Bytes),
); err != nil {
valueVector.SafeDestruct()
nullVector.SafeDestruct()
countVector.SafeDestruct()
return err
}
vp.counts = countVector
return nil
}
// GetHostVectorPartySlice implements GetHostVectorPartySlice in cVectorParty
func (vp *cVectorParty) GetHostVectorPartySlice(startIndex, length int) common.HostVectorPartySlice {
endIndex := startIndex + length
values, valueStartIndex, valueBytes := vp.values.GetSliceBytesAligned(startIndex, endIndex)
nulls, nullStartIndex, nullBytes := vp.nulls.GetSliceBytesAligned(startIndex, endIndex)
counts, countStartIndex, countBytes := vp.counts.GetSliceBytesAligned(startIndex, endIndex+1)
return common.HostVectorPartySlice{
Values: values,
ValueStartIndex: valueStartIndex,
ValueBytes: valueBytes,
ValueType: vp.dataType,
DefaultValue: vp.defaultValue,
Nulls: nulls,
NullStartIndex: nullStartIndex,
NullBytes: nullBytes,
Counts: counts,
CountStartIndex: countStartIndex,
CountBytes: countBytes,
Length: length,
}
}
// Allocates implements Allocate in cVectorParty
func (vp *cVectorParty) Allocate(hasCount bool) {
vp.values = vectors.NewVector(vp.dataType, vp.length)
vp.nulls = vectors.NewVector(common.Bool, vp.length)
vp.columnMode = common.HasNullVector
if hasCount {
vp.counts = vectors.NewVector(common.Int32, vp.length+1)
vp.columnMode = common.HasCountVector
}
}
// Dump is for testing purpose
func (vp *cVectorParty) Dump(file *os.File) {
fmt.Fprintf(file, "\nVectorParty, type: %s, length: %d, value: \n", common.DataTypeName[vp.dataType], vp.GetLength())
for i := 0; i < vp.GetLength(); i++ {
val := vp.GetDataValue(i)
count := 1
if vp.counts != nil {
count = int(*(*uint32)(vp.counts.GetValue(i + 1)) - *(*uint32)(vp.counts.GetValue(i)))
}
if val.Valid {
fmt.Fprintf(file, "\t%v, %d\n", val.ConvertToHumanReadable(vp.dataType), count)
} else {
fmt.Fprintf(file, "\tnil\n")
}
}
}