memstore/list/live_vector_party.go (320 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package list
import (
"fmt"
"github.com/uber/aresdb/cgoutils"
"github.com/uber/aresdb/memstore/common"
"github.com/uber/aresdb/memstore/vectors"
"github.com/uber/aresdb/utils"
"io"
"os"
"sync"
"unsafe"
)
// LiveVectorParty is the representation of list data type vector party in live store.
// It supports random access read and write. However, it does not support serialization into disk.
// It underlying uses a high level memory pool to store the list data. Therefore when this vector
// party is destructed, the underlying memory pool needs to be destroyed as well.
type LiveVectorParty struct {
baseVectorParty
// storing the offset to slab footer offset for each row.
caps *vectors.Vector
memoryPool HighLevelMemoryPool
sync.RWMutex
}
// GetBytes returns the bytes this vp occupies except memory pool
func (vp *LiveVectorParty) GetBytes() int64 {
vp.RLock()
defer vp.RUnlock()
var bytes int64
if vp.offsets != nil {
bytes += int64(vp.offsets.Bytes)
}
if vp.caps != nil {
bytes += int64(vp.caps.Bytes)
}
return bytes
}
// GetTotalBytes return the bytes this vp occupies including memory pool
func (vp *LiveVectorParty) GetTotalBytes() int64 {
vp.RLock()
defer vp.RUnlock()
var bytes int64
if vp.offsets != nil {
bytes += int64(vp.offsets.Bytes)
}
if vp.caps != nil {
bytes += int64(vp.caps.Bytes)
}
if vp.memoryPool != nil {
bytes += vp.memoryPool.GetNativeMemoryAllocator().GetTotalBytes()
}
return bytes
}
// SafeDestruct destructs vector party memory.
func (vp *LiveVectorParty) SafeDestruct() {
vp.Lock()
defer vp.Unlock()
if vp != nil {
if vp.offsets != nil {
vp.offsets.SafeDestruct()
vp.offsets = nil
}
if vp.caps != nil {
vp.caps.SafeDestruct()
vp.caps = nil
}
if vp.memoryPool != nil {
vp.memoryPool.Destroy()
vp.memoryPool = nil
}
}
}
// Write serialize vector party.
func (vp *LiveVectorParty) Write(writer io.Writer) (err error) {
vp.RLock()
defer vp.RUnlock()
dataWriter := utils.NewStreamDataWriter(writer)
if err = dataWriter.WriteUint32(ListVectorPartyHeader); err != nil {
return
}
// length
if err = dataWriter.WriteInt32(int32(vp.length)); err != nil {
return
}
// data type
if err = dataWriter.WriteUint32(uint32(vp.dataType)); err != nil {
return
}
// nonDefaultValue count, 0 for List VectorParty
if err = dataWriter.WriteInt32(int32(0)); err != nil {
return
}
// columnMode, AllValuesPresent for now
columnMode := common.AllValuesPresent
if err = dataWriter.WriteUint16(uint16(columnMode)); err != nil {
return
}
// Write 6 bytes padding.
if err = dataWriter.SkipBytes(6); err != nil {
return
}
// write offsets
var valueBytes int
for i := 0; i < vp.length; i++ {
_, length, valid := vp.GetOffsetLength(i)
if !valid {
dataWriter.WriteUint32(uint32(0))
} else if length == 0 {
dataWriter.WriteUint32(uint32(common.ZeroLengthArrayFlag))
} else {
dataWriter.WriteUint32(uint32(valueBytes))
}
dataWriter.WriteUint32(length)
valueBytes += common.CalculateListElementBytes(vp.dataType, int(length))
}
// to compatible with archive vp, align to 64 bytes alignment
dataWriter.WritePadding(8*vp.length, 64)
// value bytes, to compatible with archive vp, align to 64 bytes alignment
totalValueBytes := utils.AlignOffset(valueBytes, 64)
if err := dataWriter.WriteUint64(uint64(totalValueBytes)); err != nil {
return err
}
// write values
baseAddr := vp.memoryPool.GetNativeMemoryAllocator().GetBaseAddr()
for i := 0; i < vp.length; i++ {
offset, length, _ := vp.GetOffsetLength(i)
bytes := common.CalculateListElementBytes(vp.dataType, int(length))
if bytes > 0 {
if err = dataWriter.Write(cgoutils.MakeSliceFromCPtr(baseAddr+uintptr(offset), bytes)); err != nil {
return
}
}
}
dataWriter.WritePadding(valueBytes, 64)
return
}
// Read deserialize vector party
func (vp *LiveVectorParty) Read(reader io.Reader, serializer common.VectorPartySerializer) (err error) {
vp.Lock()
defer vp.Unlock()
dataReader := utils.NewStreamDataReader(reader)
magicNumber, err := dataReader.ReadUint32()
defer func() {
if err != nil {
vp.SafeDestruct()
}
}()
if err != nil {
return
}
if magicNumber != ListVectorPartyHeader {
return utils.StackError(nil, "Magic number does not match, vector party file may be corrupted")
}
rawLength, err := dataReader.ReadInt32()
if err != nil {
return
}
length := int(rawLength)
rawDataType, err := dataReader.ReadUint32()
if err != nil {
return
}
dataType, err := common.NewDataType(rawDataType)
if err != nil {
return
}
// non default value count
_, err = dataReader.ReadInt32()
if err != nil {
return
}
// column mode
m, err := dataReader.ReadUint16()
if err != nil {
return
}
columnMode := common.ColumnMode(m)
if columnMode >= common.MaxColumnMode {
return utils.StackError(nil, "Invalid mode %d", columnMode)
}
// Read unused bytes
err = dataReader.SkipBytes(6)
if err != nil {
return
}
vp.length = length
vp.dataType = dataType
vp.Allocate(false)
if err = dataReader.Read(cgoutils.MakeSliceFromCPtr(uintptr(vp.offsets.Buffer()), vp.offsets.Bytes)); err != nil {
return
}
// Read value bytes
_, err = dataReader.ReadUint64()
if err != nil {
return
}
var zero uint32 = 0
for i := 0; i < vp.length; i++ {
itemLen := *(*uint32)(vp.offsets.GetValue(2*i + 1))
itemBytes := common.CalculateListElementBytes(vp.dataType, int(itemLen))
if itemBytes > 0 {
buf := vp.memoryPool.Allocate(itemBytes)
// update offset.
vp.offsets.SetValue(2*i, unsafe.Pointer(&buf[0]))
// Set footer offset.
vp.caps.SetValue(i, unsafe.Pointer(&buf[1]))
addr := vp.memoryPool.Interpret(buf[0])
if err = dataReader.Read(cgoutils.MakeSliceFromCPtr(addr, itemBytes)); err != nil {
return
}
} else {
vp.offsets.SetValue(2*i, unsafe.Pointer(&zero))
vp.caps.SetValue(i, unsafe.Pointer(&zero))
}
}
if serializer != nil {
serializer.ReportVectorPartyMemoryUsage(int64(vp.GetBytes()))
}
return
}
// GetCap returns the cap at ith row. Only used for free a list element in live store.
func (vp *LiveVectorParty) GetCap(row int) uint32 {
vp.RLock()
defer vp.RUnlock()
return *(*uint32)(vp.caps.GetValue(row))
}
// SetBool is not supported by list vector party.
func (vp *LiveVectorParty) SetBool(offset int, val bool, valid bool) {
utils.GetLogger().Panic("SetBool is not supported by list vector party")
}
// SetValue is the implementation of common.LiveVectorParty
func (vp *LiveVectorParty) SetValue(row int, val unsafe.Pointer, valid bool) {
vp.Lock()
defer vp.Unlock()
var newLen int
if valid {
if val != nil {
newLen = int(*(*uint32)(val))
}
}
oldOffset, oldLen, _ := vp.GetOffsetLength(row)
oldCap := *(*uint32)(vp.caps.GetValue(row))
oldBytes := common.CalculateListElementBytes(vp.dataType, int(oldLen))
newBytes := common.CalculateListElementBytes(vp.dataType, int(newLen))
buf := vp.memoryPool.Reallocate([2]uintptr{uintptr(oldOffset), uintptr(oldCap)}, oldBytes, newBytes)
if !valid {
vp.SetOffsetLength(row, nil, nil)
} else {
vp.SetOffsetLength(row, unsafe.Pointer(&buf[0]), unsafe.Pointer(&newLen))
}
// Set footer offset.
vp.caps.SetValue(row, unsafe.Pointer(&buf[1]))
if valid {
baseAddr := vp.memoryPool.Interpret(buf[0])
utils.MemCopy(unsafe.Pointer(baseAddr), val, newBytes)
}
}
// AsList is the implementation from common.VectorParty
func (vp *LiveVectorParty) AsList() common.ListVectorParty {
return vp
}
// Equals is the implementation from common.VectorParty
func (vp *LiveVectorParty) Equals(other common.VectorParty) bool {
return vp.equals(other, vp.AsList())
}
// SetListValue is the implentation of common.ListVecotrParty
func (vp *LiveVectorParty) GetListValue(row int) (unsafe.Pointer, bool) {
return vp.GetValue(row)
}
// SetListValue is the implentation of common.ListVecotrParty
func (vp *LiveVectorParty) SetListValue(row int, val unsafe.Pointer, valid bool) {
vp.SetValue(row, val, valid)
}
// SetGoValue is not supported by list vector party.
func (vp *LiveVectorParty) SetGoValue(offset int, val common.GoDataValue, valid bool) {
utils.GetLogger().Panic("SetGoValue is not supported by list vector party")
}
// GetValue is the implementation from common.VectorParty
func (vp *LiveVectorParty) GetValue(row int) (val unsafe.Pointer, validity bool) {
vp.RLock()
defer vp.RUnlock()
offset, length, valid := vp.GetOffsetLength(row)
if !valid {
return nil, false
} else if length == 0 {
return nil, true
}
baseAddr := vp.memoryPool.GetNativeMemoryAllocator().GetBaseAddr()
return unsafe.Pointer(baseAddr + uintptr(offset)), true
}
// GetMinMaxValue is not supported by list vector party.
func (vp *LiveVectorParty) GetMinMaxValue() (min, max uint32) {
utils.GetLogger().Panic("GetMinMaxValue is not supported by list vector party")
return
}
// GetDataValue is not implemented in baseVectorParty
func (vp *LiveVectorParty) GetDataValue(row int) common.DataValue {
if row < 0 || row > vp.length {
return common.NullDataValue
}
val, valid := vp.GetValue(row)
return common.DataValue{
DataType: vp.dataType,
Valid: valid,
OtherVal: val,
}
}
// SetDataValue
func (vp *LiveVectorParty) SetDataValue(row int, value common.DataValue,
countsUpdateMode common.ValueCountsUpdateMode, counts ...uint32) {
vp.SetValue(row, value.OtherVal, value.Valid)
}
// GetDataValueByRow just call GetDataValue
func (vp *LiveVectorParty) GetDataValueByRow(row int) common.DataValue {
return vp.GetDataValue(row)
}
// Allocate allocate underlying storage for vector party
func (vp *LiveVectorParty) Allocate(hasCount bool) {
vp.caps = vectors.NewVector(common.Uint32, vp.length)
vp.offsets = vectors.NewVector(common.Uint32, vp.length*2)
vp.memoryPool = NewHighLevelMemoryPool(vp.reporter)
}
// Dump is for testing purpose
func (vp *LiveVectorParty) Dump(file *os.File) {
fmt.Fprintf(file, "\nArray LiveVectorParty, type: %s, length: %d, value: \n", common.DataTypeName[vp.dataType], vp.GetLength())
for i := 0; i < vp.GetLength(); i++ {
val := vp.GetDataValue(i)
if val.Valid {
fmt.Fprintf(file, "\t%v\n", val.ConvertToHumanReadable(vp.dataType))
} else {
fmt.Fprintln(file, "\tnil")
}
}
}
// GetHostVectorPartySlice implements GetHostVectorPartySlice in TransferableVectorParty
func (vp *LiveVectorParty) GetHostVectorPartySlice(startIndex, length int) common.HostVectorPartySlice {
// LiveVectorParty will always use startIndex = 0, length is whole VP, so startIndex is ignored here
return common.HostVectorPartySlice{
Values: unsafe.Pointer(vp.memoryPool.GetNativeMemoryAllocator().GetBaseAddr()),
ValueBytes: int(vp.memoryPool.GetNativeMemoryAllocator().GetTotalBytes()),
Length: length,
Offsets: vp.offsets.Buffer(),
ValueType: vp.dataType,
}
}
// SetLength is only for testing purpose, do NOT use this function in real code
func (vp *LiveVectorParty) SetLength(length int) {
vp.length = length
}
// NewLiveVectorParty returns a LiveVectorParty pointer which implements ListVectorParty.
// It's safe to pass nil HostMemoryManager.
func NewLiveVectorParty(length int, dataType common.DataType,
hmm common.HostMemoryManager) common.LiveVectorParty {
vp := &LiveVectorParty{
baseVectorParty: baseVectorParty{
length: length,
dataType: dataType,
reporter: func(bytes int64) {
if hmm != nil {
hmm.ReportUnmanagedSpaceUsageChange(bytes)
}
},
},
}
vp.baseVectorParty.getDataValueFn = vp.GetDataValue
return vp
}