memstore/list/archive_vector_party.go (321 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package list
import (
"fmt"
"github.com/uber/aresdb/cgoutils"
"github.com/uber/aresdb/diskstore"
"github.com/uber/aresdb/memstore/common"
"github.com/uber/aresdb/memstore/vectors"
"github.com/uber/aresdb/utils"
"io"
"os"
"sync"
"unsafe"
)
const (
ListVectorPartyHeader uint32 = 0xFADEFACF
)
// ArchiveVectorParty is the representation of list data type vector party in archive store.
// It does not support random access update. Instead updates to archiveListVectorParty can only be done
// via appending to the tail during archiving and backfill.
// It use a single value vector to store the values and validities so that it has the same
// in memory representation except archiving vp does not have cap vector.
// The only update supported is same length value in place change
type ArchiveVectorParty struct {
baseVectorParty
common.Pinnable
values *vectors.Vector
// bytesWritten should only be used when archiving or backfilling. It's used to record the current position
// in values vector.
bytesWritten int64
// lengthFilled is to record the number of records appended, which can not exceed length
lengthFilled int
totalValueBytes int64
}
// NewArchiveVectorParty returns a new ArchiveVectorParty.
// It should only be used during backfill or archiving when constructing a new list
// archiving vp.
// Length is the number of total rows and totalValueBytes is the total bytes used to
// store values and validities.
func NewArchiveVectorParty(length int, dataType common.DataType,
totalValueBytes int64, locker sync.Locker) common.ArchiveVectorParty {
return newArchiveVectorParty(length, dataType, totalValueBytes, locker)
}
func newArchiveVectorParty(length int, dataType common.DataType,
totalValueBytes int64, locker sync.Locker) *ArchiveVectorParty {
vp := &ArchiveVectorParty{
baseVectorParty: baseVectorParty{
length: length,
dataType: dataType,
},
Pinnable: common.Pinnable{
AllUsersDone: sync.NewCond(locker),
},
totalValueBytes: totalValueBytes,
}
vp.baseVectorParty.getDataValueFn = vp.GetDataValue
return vp
}
// Allocate allocate underlying storage for vector party. Note allocation for
// archive vp does not report host memory change. Memory reporting is done
// after switching to the new version of archive store. Before switching the memory
// managed by this vp is counted as unmanaged memory.
func (vp *ArchiveVectorParty) Allocate(hasCount bool) {
vp.offsets = vectors.NewVector(common.Uint32, vp.length*2)
vp.values = vectors.NewVector(common.Uint8, int(vp.totalValueBytes))
}
// GetBytes returns the bytes this vp occupies.
func (vp *ArchiveVectorParty) GetBytes() int64 {
if vp.values != nil && vp.offsets != nil {
return int64(vp.values.Bytes + vp.offsets.Bytes)
}
return 0
}
// SafeDestruct destructs vector party memory.
func (vp *ArchiveVectorParty) SafeDestruct() {
if vp != nil {
vp.offsets.SafeDestruct()
vp.offsets = nil
vp.values.SafeDestruct()
vp.values = nil
}
}
// AsList is the implementation from common.VectorParty
func (vp *ArchiveVectorParty) AsList() common.ListVectorParty {
return vp
}
// Equals is the implementation from common.VectorParty
func (vp *ArchiveVectorParty) Equals(other common.VectorParty) bool {
return vp.equals(other, vp.AsList())
}
// GetValue is the implementation from common.VectorParty
func (vp *ArchiveVectorParty) getValue(row int) (val unsafe.Pointer, validity bool) {
offset, length, valid := vp.GetOffsetLength(row)
if !valid {
return nil, false
} else if length == 0 {
return nil, true
}
baseAddr := uintptr(vp.values.Buffer())
return unsafe.Pointer(baseAddr + uintptr(offset)), true
}
// GetDataValue is not implemented in baseVectorParty
func (vp *ArchiveVectorParty) GetDataValue(row int) common.DataValue {
val, valid := vp.getValue(row)
return common.DataValue{
DataType: vp.dataType,
Valid: valid,
OtherVal: val,
}
}
// GetDataValueByRow just call GetDataValue
func (vp *ArchiveVectorParty) GetDataValueByRow(row int) common.DataValue {
return vp.GetDataValue(row)
}
func (vp *ArchiveVectorParty) setValue(row int, val unsafe.Pointer, valid bool) {
if !valid {
vp.SetOffsetLength(row, nil, nil)
if row >= vp.lengthFilled {
vp.lengthFilled++
}
return
}
var newLen uint32
if val != nil {
newLen = *(*uint32)(val)
}
newBytes := common.CalculateListElementBytes(vp.dataType, int(newLen))
if row < vp.lengthFilled {
oldOffset, oldLen, _ := vp.GetOffsetLength(row)
if oldLen != newLen {
// invalid in-place update, should never happen
utils.GetLogger().Panic("in-place update array archive vp with different length")
} else {
baseAddr := uintptr(vp.values.Buffer()) + uintptr(oldOffset)
utils.MemCopy(unsafe.Pointer(baseAddr), val, newBytes)
}
} else if row == vp.lengthFilled {
if vp.bytesWritten+int64(newBytes) > vp.totalValueBytes {
utils.GetLogger().Panicf("Array ArchiveVectorParty SetValue exceeded buffer limit")
}
// update offset/length
vp.SetOffsetLength(row, unsafe.Pointer(&vp.bytesWritten), unsafe.Pointer(&newLen))
baseAddr := uintptr(vp.values.Buffer()) + uintptr(vp.bytesWritten)
utils.MemCopy(unsafe.Pointer(baseAddr), val, newBytes)
vp.lengthFilled++
vp.bytesWritten += int64(newBytes)
} else {
// invalid jump update, should never happen
utils.GetLogger().Panic("jump update array archive vp")
}
}
// SetDataValue is the implentation of common.VecotrParty
func (vp *ArchiveVectorParty) SetDataValue(row int, value common.DataValue,
countsUpdateMode common.ValueCountsUpdateMode, counts ...uint32) {
vp.setValue(row, value.OtherVal, value.Valid)
}
// SetListValue is the implentation of common.ListVecotrParty
func (vp *ArchiveVectorParty) GetListValue(row int) (unsafe.Pointer, bool) {
return vp.getValue(row)
}
// SetListValue is the implentation of common.ListVecotrParty
func (vp *ArchiveVectorParty) SetListValue(row int, val unsafe.Pointer, valid bool) {
vp.setValue(row, val, valid)
}
// Write is the implentation of common.VecotrParty
func (vp *ArchiveVectorParty) Write(writer io.Writer) error {
dataWriter := utils.NewStreamDataWriter(writer)
if err := dataWriter.WriteUint32(ListVectorPartyHeader); err != nil {
return err
}
// length
if err := dataWriter.WriteInt32(int32(vp.length)); err != nil {
return err
}
// data type
if err := dataWriter.WriteUint32(uint32(vp.dataType)); err != nil {
return err
}
// nonDefaultValue count, 0 for List VectorParty
if err := dataWriter.WriteInt32(int32(0)); err != nil {
return err
}
// columnMode, AllValuesPresent for now
columnMode := common.AllValuesPresent
if err := dataWriter.WriteUint16(uint16(columnMode)); err != nil {
return err
}
// Write 6 bytes padding.
if err := dataWriter.SkipBytes(6); err != nil {
return err
}
// Write offset vector.
// Here we directly move data from c allocated memory into writer.
if err := dataWriter.Write(cgoutils.MakeSliceFromCPtr(uintptr(vp.offsets.Buffer()), vp.offsets.Bytes)); err != nil {
return err
}
// value bytes
if err := dataWriter.WriteUint64(uint64(vp.values.Bytes)); err != nil {
return err
}
// Write value vector.
if err := dataWriter.Write(
cgoutils.MakeSliceFromCPtr(uintptr(vp.values.Buffer()), vp.values.Bytes),
); err != nil {
return err
}
return nil
}
// Read reads a vector party from underlying reader. It first reads header from the reader and does
// several sanity checks. Then it reads vectors based on vector party mode.
func (vp *ArchiveVectorParty) Read(reader io.Reader, s common.VectorPartySerializer) error {
dataReader := utils.NewStreamDataReader(reader)
magicNumber, err := dataReader.ReadUint32()
defer func() {
if err != nil {
if vp.offsets != nil {
vp.offsets.SafeDestruct()
}
if vp.values != nil {
vp.values.SafeDestruct()
}
}
}()
if err != nil {
return err
}
if magicNumber != ListVectorPartyHeader {
return utils.StackError(nil, "Magic number does not match, vector party file may be corrupted")
}
rawLength, err := dataReader.ReadInt32()
if err != nil {
return err
}
length := int(rawLength)
rawDataType, err := dataReader.ReadUint32()
if err != nil {
return err
}
dataType, err := common.NewDataType(rawDataType)
if err != nil {
return err
}
// non default value count
_, err = dataReader.ReadInt32()
if err != nil {
return err
}
// column mode
m, err := dataReader.ReadUint16()
if err != nil {
return err
}
columnMode := common.ColumnMode(m)
if columnMode >= common.MaxColumnMode {
return utils.StackError(nil, "Invalid mode %d", columnMode)
}
// Read unused bytes
err = dataReader.SkipBytes(6)
if err != nil {
return err
}
vp.length = length
vp.dataType = dataType
vp.offsets = vectors.NewVector(common.Uint32, vp.length*2)
if err = dataReader.Read(cgoutils.MakeSliceFromCPtr(uintptr(vp.offsets.Buffer()), vp.offsets.Bytes)); err != nil {
return err
}
// Read value bytes
bytes, err := dataReader.ReadUint64()
if err != nil {
return err
}
vp.totalValueBytes = int64(bytes)
// Read value vector.
vp.values = vectors.NewVector(common.Uint8, int(vp.totalValueBytes))
// Here we directly read from reader into the c allocated bytes.
if err = dataReader.Read(cgoutils.MakeSliceFromCPtr(uintptr(vp.values.Buffer()), vp.values.Bytes)); err != nil {
return err
}
vp.bytesWritten = vp.totalValueBytes
vp.lengthFilled = vp.length
if s != nil {
// memory usage:
// 1. offset vector party: (4 bytes offset + 4 bytes length) * length
// 2. value vector is totalValueBytes of uint8
s.ReportVectorPartyMemoryUsage(int64(vp.length*4*2) + vp.totalValueBytes)
}
return nil
}
// GetCount returns cumulative count on specified offset.
func (vp *ArchiveVectorParty) GetCount(offset int) uint32 {
// Same as non mode 3 vector.
return 1
}
// SetCount is not supported by list vector party.
func (vp *ArchiveVectorParty) SetCount(offset int, count uint32) {
utils.GetLogger().Panic("SetCount is not supported by list vector party")
}
// LoadFromDisk load archive vector party from disk caller should lock archive batch before using
func (vp *ArchiveVectorParty) LoadFromDisk(hostMemManager common.HostMemoryManager, diskStore diskstore.DiskStore,
table string, shardID int, columnID, batchID int, batchVersion uint32, seqNum uint32) {
vp.Loader.Add(1)
go func() {
serializer := common.NewVectorPartyArchiveSerializer(hostMemManager, diskStore, table, shardID, columnID, batchID, batchVersion, seqNum)
err := serializer.ReadVectorParty(vp)
if err != nil {
utils.GetLogger().Panic(err)
}
vp.Loader.Done()
}()
}
// Prune prunes vector party based on column mode to clean memory if possible
func (vp *ArchiveVectorParty) Prune() {
// Nothing to prune for list vp.
}
// SliceByValue is not supported by list vector party.
func (vp *ArchiveVectorParty) SliceByValue(lowerBoundRow, upperBoundRow int, value unsafe.Pointer) (
startRow int, endRow int, startIndex int, endIndex int) {
utils.GetLogger().Panic("SliceByValue is not supported by list vector party")
return
}
// Slice vector party to get [startIndex, endIndex) based on [lowerBoundRow, upperBoundRow)
func (vp *ArchiveVectorParty) SliceIndex(lowerBoundRow, upperBoundRow int) (
startIndex, endIndex int) {
return lowerBoundRow, upperBoundRow
}
// GetHostVectorPartySlice implements GetHostVectorPartySlice in TransferableVectorParty
func (vp *ArchiveVectorParty) GetHostVectorPartySlice(startIndex, length int) common.HostVectorPartySlice {
if startIndex+length > vp.length {
utils.GetLogger().Panic("Required length is over the size of ArrayArchiveVectorParty")
}
offsetStart := unsafe.Pointer(uintptr(vp.offsets.Buffer()) + uintptr(startIndex*8))
baseAddr := uintptr(vp.values.Buffer())
var valueStart int
valueBytes := vp.values.Bytes
for i := startIndex; i < (startIndex + length); i++ {
// find first entry which has non-zero length array value, which will have valid offset
// if not found, then will start from baseAddr
offset, count, valid := vp.GetOffsetLength(i)
if valid && count > 0 {
valueStart = int(offset)
break
}
}
for i := startIndex + length; i < vp.length; i++ {
// find first entry which has non-zero length array value, which will have valid offset
// if not found, then will be the end of value buffer
offset, count, valid := vp.GetOffsetLength(i)
if valid && count > 0 {
valueBytes = int(offset)
break
}
}
return common.HostVectorPartySlice{
Values: unsafe.Pointer(baseAddr + uintptr(valueStart)),
ValueBytes: valueBytes - valueStart,
ValueOffsetAdjust: valueStart,
Length: length,
Offsets: offsetStart,
ValueType: vp.dataType,
}
}
// Dump is for testing purpose
func (vp *ArchiveVectorParty) Dump(file *os.File) {
fmt.Fprintf(file, "\nArray ArchiveVectorParty, type: %s, length: %d, value: \n", common.DataTypeName[vp.dataType], vp.GetLength())
for i := 0; i < vp.GetLength(); i++ {
val := vp.GetDataValue(i)
if val.Valid {
fmt.Fprintf(file, "\t%v\n", val.ConvertToHumanReadable(vp.dataType))
} else {
fmt.Fprintf(file, "\tnil\n")
}
}
}
// CopyOnWrite clone vector party for updates, the update can only for in-place change with same length for update row
func (vp *ArchiveVectorParty) CopyOnWrite(batchSize int) common.ArchiveVectorParty {
// archive vector party should always have allUsersDone initialized correctly with batch rwlock
newVP := newArchiveVectorParty(batchSize, vp.dataType, vp.totalValueBytes, vp.AllUsersDone.L)
newVP.Allocate(false)
if vp.values != nil {
utils.MemCopy(unsafe.Pointer(newVP.values.Buffer()), unsafe.Pointer(vp.values.Buffer()), vp.values.Bytes)
}
if vp.offsets != nil {
utils.MemCopy(unsafe.Pointer(newVP.offsets.Buffer()), unsafe.Pointer(vp.offsets.Buffer()), vp.offsets.Bytes)
}
newVP.totalValueBytes = vp.totalValueBytes
newVP.bytesWritten = vp.bytesWritten
newVP.lengthFilled = vp.lengthFilled
return newVP
}