memstore/live_vector_party.go (276 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package memstore import ( "bufio" "fmt" "github.com/uber/aresdb/memstore/common" "github.com/uber/aresdb/memstore/list" "github.com/uber/aresdb/utils" "io" "os" "reflect" "unsafe" ) // cLiveVectorParty is the implementation of LiveVectorParty with c allocated memory // this vector party stores columns with fixed length data type type cLiveVectorParty struct { cVectorParty } // SetBool implements SetBool in LiveVectorParty interface func (vp *cLiveVectorParty) SetBool(offset int, val bool, valid bool) { vp.setValidity(offset, valid) vp.values.SetBool(offset, val) return } // SetBool implements SetValue in LiveVectorParty interface func (vp *cLiveVectorParty) SetValue(offset int, val unsafe.Pointer, valid bool) { vp.setValidity(offset, valid) if valid { vp.values.SetValue(offset, val) } else { var zero [2]uint64 vp.values.SetValue(offset, unsafe.Pointer(&zero)) } } // SetGoValue implements SetGoValue in LiveVectorParty interface func (vp *cLiveVectorParty) SetGoValue(offset int, val common.GoDataValue, valid bool) { panic("SetGoValue is not supported in cLiveVectorParty") } // GetValue implements GetValue in LiveVectorParty interface func (vp *cLiveVectorParty) GetValue(offset int) (unsafe.Pointer, bool) { return vp.values.GetValue(offset), vp.GetValidity(offset) } // goLiveVectorParty is the implementation of LiveVectorParty with go allocated memory // this vector party stores columns with variable length data type type goLiveVectorParty struct { baseVectorParty values []common.GoDataValue hostMemoryManager common.HostMemoryManager totalBytes int64 } // GetMinMaxValue implements GetMinMaxValue in LiveVectorParty interface func (vp *cLiveVectorParty) GetMinMaxValue() (min uint32, max uint32) { return vp.values.GetMinValue(), vp.values.GetMaxValue() } // Allocate implements Allocate in VectorParty interface func (vp *cLiveVectorParty) Allocate(hasCount bool) { vp.cVectorParty.Allocate(hasCount) vp.fillWithDefaultValue() } // Allocate implements Allocate in VectorParty interface func (vp *goLiveVectorParty) Allocate(hasCount bool) { vp.values = make([]common.GoDataValue, vp.length) } // SetDataValue implements SetDataValue in VectorParty interface // liveVectorParty ignores countsUpdateMode or counts func (vp *goLiveVectorParty) SetDataValue(offset int, value common.DataValue, countsUpdateMode common.ValueCountsUpdateMode, counts ...uint32) { vp.SetGoValue(offset, value.GoVal, value.Valid) } // SetBool implements SetBool in LiveVectorParty interface func (vp *goLiveVectorParty) SetBool(offset int, val bool, valid bool) { panic("SetBool is not supported in goLiveVectorParty") } // SetValue implements SetValue in LiveVectorParty interface func (vp *goLiveVectorParty) SetValue(offset int, val unsafe.Pointer, valid bool) { panic("SetValue is not supported in goLiveVectorParty") } // GetValue implements GetValue in LiveVectorParty interface func (vp *goLiveVectorParty) GetValue(offset int) (unsafe.Pointer, bool) { panic("GetValue is not supported in goLiveVectorParty") } // SetGoValue implements SetGoValue in LiveVectorParty interface func (vp *goLiveVectorParty) SetGoValue(offset int, val common.GoDataValue, valid bool) { oldBytes, newBytes := 0, 0 if vp.values[offset] != nil { oldBytes = vp.values[offset].GetBytes() } if !valid || val == nil { newBytes = 0 vp.values[offset] = nil } else { newBytes = val.GetBytes() vp.values[offset] = val } bytesChange := int64(newBytes - oldBytes) vp.hostMemoryManager.ReportUnmanagedSpaceUsageChange(bytesChange) vp.totalBytes += bytesChange } // GetDataValue implements GetDataValue in VectorParty interface func (vp *goLiveVectorParty) GetDataValue(offset int) common.DataValue { val := common.DataValue{ Valid: vp.GetValidity(offset), DataType: vp.dataType, } if !val.Valid { return val } val.GoVal = vp.values[offset] return val } // GetDataValueByRow implements GetDataValueByRow in VectorParty interface func (vp *goLiveVectorParty) GetDataValueByRow(row int) common.DataValue { return vp.GetDataValue(row) } // GetValidity implements GetValidity in VectorParty interface func (vp *goLiveVectorParty) GetValidity(offset int) bool { return vp.values[offset] != nil } // GetMinMaxValue is **not supported** by goLiveVectorParty func (vp *goLiveVectorParty) GetMinMaxValue() (min uint32, max uint32) { return 0, 0 } // GetBytes implements GetBytes in VectorParty interface func (vp *goLiveVectorParty) GetBytes() int64 { return vp.totalBytes } // Slice implements Slice in VectorParty interface func (vp *goLiveVectorParty) Slice(startRow, numRows int) common.SlicedVector { beginIndex := startRow // size is the number of entries in the vector, size := vp.length - beginIndex if size < 0 { size = 0 } if size > numRows { size = numRows } vector := common.SlicedVector{ Values: make([]interface{}, size), Counts: make([]int, size), } for i := 0; i < size; i++ { vector.Values[i] = vp.GetDataValue(beginIndex + i).ConvertToHumanReadable(vp.dataType) vector.Counts[i] = i + 1 } return vector } // Write implements Write in VectorParty interface func (vp *goLiveVectorParty) Write(writer io.Writer) error { bufferWriter := bufio.NewWriter(writer) dataWriter := utils.NewStreamDataWriter(bufferWriter) // write total bytes for reporting during loading err := dataWriter.WriteUint64(uint64(vp.GetBytes())) if err != nil { return err } // write length err = dataWriter.WriteUint32(uint32(vp.length)) if err != nil { return err } // count non nil values numValidValues := 0 for _, value := range vp.values { if value != nil { numValidValues++ } } // write number of valid values err = dataWriter.WriteUint32(uint32(numValidValues)) if err != nil { return err } allValid := numValidValues == vp.length // write values for i, value := range vp.values { if value != nil { // only write index if not all valid if !allValid { err = dataWriter.WriteUint32(uint32(i)) if err != nil { return err } } err = value.Write(&dataWriter) if err != nil { return err } } } return bufferWriter.Flush() } // Read implements Read in VectorParty interface func (vp *goLiveVectorParty) Read(reader io.Reader, serializer common.VectorPartySerializer) error { dataReader := utils.NewStreamDataReader(reader) // read total bytes for reporting during loading totalBytes, err := dataReader.ReadUint64() if err != nil { return err } vp.totalBytes = int64(totalBytes) serializer.ReportVectorPartyMemoryUsage(int64(totalBytes * utils.GolangMemoryFootprintFactor)) length, err := dataReader.ReadUint32() if err != nil { return err } vp.length = int(length) vp.Allocate(false) numValidValues, err := dataReader.ReadUint32() if err != nil { return err } allValid := numValidValues == uint32(vp.length) for i := 0; i < int(numValidValues); i++ { var index uint32 if !allValid { index, err = dataReader.ReadUint32() if err != nil { return err } } else { index = uint32(i) } goValue := common.GetGoDataValue(vp.dataType) err = goValue.Read(&dataReader) if err != nil { return err } vp.values[index] = goValue } return nil } // SafeDestruct implements SafeDestruct in VectorParty interface func (vp *goLiveVectorParty) SafeDestruct() { for i := range vp.values { vp.values[i] = nil } vp.values = nil vp.length = 0 } // Equals implements Equals in VectorParty interface func (vp *goLiveVectorParty) Equals(other common.VectorParty) bool { if vp == nil || other == nil { return vp == nil && other == nil } if vp.dataType != other.GetDataType() { return false } if vp.GetLength() != other.GetLength() { return false } otherVP, ok := other.(*goLiveVectorParty) if !ok { return false } for i, ptr := range vp.values { if ptr == nil { if otherVP.values[i] != nil { return false } } else { if !reflect.DeepEqual(vp.values[i], otherVP.values[i]) { return false } } } return true } func (vp *goLiveVectorParty) Dump(file *os.File) { fmt.Fprintf(file, "\nGO LiveVectorParty, type: %s, length: %d, value: \n", common.DataTypeName[vp.dataType], vp.GetLength()) for i := 0; i < vp.GetLength(); i++ { val := vp.GetDataValue(i) if val.Valid { fmt.Fprintf(file, "\t%v\n", val.ConvertToHumanReadable(vp.dataType)) } else { fmt.Println(file, "\tnil") } } } // NewLiveVectorParty creates LiveVectorParty func NewLiveVectorParty(length int, dataType common.DataType, defaultValue common.DataValue, hostMemoryManager common.HostMemoryManager) common.LiveVectorParty { isGoType := common.IsGoType(dataType) if isGoType { return newGoLiveVetorParty(length, dataType, hostMemoryManager) } if common.IsArrayType(dataType) { return list.NewLiveVectorParty(length, dataType, hostMemoryManager) } return newCLiveVectorParty(length, dataType, defaultValue) } // newCLiveVectorParty creates a LiveVectorParty with c allocated memory func newCLiveVectorParty(length int, dataType common.DataType, defaultValue common.DataValue) *cLiveVectorParty { vp := &cLiveVectorParty{ cVectorParty: cVectorParty{ baseVectorParty: baseVectorParty{ length: length, dataType: dataType, defaultValue: defaultValue, }, }, } return vp } // newGoLiveVetorParty creates a LiveVectorParty with go allocated memory func newGoLiveVetorParty(length int, dataType common.DataType, hostMemoryManager common.HostMemoryManager) *goLiveVectorParty { vp := &goLiveVectorParty{ baseVectorParty: baseVectorParty{ length: length, dataType: dataType, defaultValue: common.NullDataValue, }, hostMemoryManager: hostMemoryManager, } return vp }