memstore/common/upsert_batch_builder.go (402 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package common import ( "fmt" "github.com/uber/aresdb/utils" "math" "unsafe" ) // ColumnUpdateMode represents how to update data from UpsertBatch type ColumnUpdateMode int // UpsertBatchVersion represents the version of upsert batch type UpsertBatchVersion uint32 const ( // UpdateOverwriteNotNull (default) will overwrite existing value if new value is NOT null, otherwise just skip UpdateOverwriteNotNull ColumnUpdateMode = iota // UpdateForceOverwrite will simply overwrite existing value even when new data is null UpdateForceOverwrite // UpdateWithAddition will add the existing value with new value if new value is not null, existing null value will be treated as 0 in Funculation UpdateWithAddition // UpdateWithMin will save the minimum of existing and new value if new value is not null, existing null value will be treated as MAX_INT in Funculation UpdateWithMin // UpdateWithMax will save the maximum of existing and new value if new value is not null, existing null value will be treated as MIN_INT in Funculation UpdateWithMax // MaxColumnUpdateMode is the current upper limit for column update modes MaxColumnUpdateMode ) const ( V1 UpsertBatchVersion = 0xFEED0001 ) type columnBuilder struct { columnID int dataType DataType values []interface{} numValidValues int updateMode ColumnUpdateMode isTimeColumn bool } // SetValue write a value into the column at given row. func (c *columnBuilder) SetValue(row int, value interface{}) error { oldValueNull := c.values[row] == nil if value == nil { c.values[row] = nil } else { var err error c.values[row], err = ConvertValueForType(c.dataType, value) if err != nil { if c.isTimeColumn { // force time column value is uint32 value64, ok := ConvertToUint64(value) if ok { c.values[row] = uint32(value64 / 1000) } else { return fmt.Errorf("Invalid value at time column, value=%b", value) } } else { return err } } } if oldValueNull && c.values[row] != nil { c.numValidValues++ } else if !oldValueNull && c.values[row] == nil { c.numValidValues-- } return nil } // AddRow grow the value array by 1. func (c *columnBuilder) AddRow() { c.values = append(c.values, nil) } // AddRow shrink the value array by 1. func (c *columnBuilder) RemoveRow() { lastValueIndex := len(c.values) - 1 lastValueNull := c.values[lastValueIndex] == nil c.values = c.values[:lastValueIndex] if !lastValueNull { c.numValidValues-- } } // ResetRows reset the row count to 0. func (c *columnBuilder) ResetRows() { c.values = c.values[0:0] c.numValidValues = 0 } // Calculated BufferSize returns the size of the column data in serialized format. func (c *columnBuilder) CalculateBufferSize(offset *int) { isGoType := IsGoType(c.dataType) isArrayType := IsArrayType(c.dataType) switch c.GetMode() { case AllValuesDefault: case HasNullVector: if !isGoType { *offset += (len(c.values) + 7) / 8 } fallthrough case AllValuesPresent: // if golang memory or array type, align to 4 bytes for offset vector if isGoType || isArrayType { *offset = utils.AlignOffset(*offset, 4) // 1. uint32 for each offset value, and length = numRows + 1 // 2. last offset value is the end offset of the offset buffer *offset += (len(c.values) + 1) * 4 // Padding size for value vector *offset = utils.AlignOffset(*offset, 8) for _, v := range c.values { if v != nil { if isGoType { goVal := v.(GoDataValue) *offset += goVal.GetSerBytes() } else { arrVal := v.(*ArrayValue) *offset += arrVal.GetSerBytes() } } } } else { // Padding size for value vector *offset = utils.AlignOffset(*offset, 8) // fixed value size *offset += (DataTypeBits(c.dataType)*len(c.values) + 7) / 8 } } } // AppendToBuffer writes the column data to buffer and advances offset. func (c *columnBuilder) AppendToBuffer(writer *utils.BufferWriter) error { writer.AlignBytes(1) // both gotype and array type is variable length isVariableLength := IsGoType(c.dataType) || IsArrayType(c.dataType) switch c.GetMode() { case AllValuesDefault: return nil case HasNullVector: // only non goType needs to write null vector if !IsGoType(c.dataType) { for row := 0; row < len(c.values); row++ { value := c.values[row] if err := writer.AppendBool(value != nil); err != nil { return utils.StackError(err, "Failed to write null vector at row %d", row) } } } fallthrough case AllValuesPresent: var offsetWriter, valueWriter *utils.BufferWriter // variable length data type needs to write offsetVector if isVariableLength { // Padding to 4 byte alignment for offset vector writer.AlignBytes(4) writerForked := *writer offsetWriter = &writerForked // skip offset bytes writer.SkipBytes((len(c.values) + 1) * 4) } // Padding to 8 byte alignment for value vector writer.AlignBytes(8) valueWriter = writer // local byte offset of current value in value vector currentValueOffset := uint32(0) // write values starting from current value vector offset for row := 0; row < len(c.values); row++ { // write current offset if offsetWriter is defined if offsetWriter != nil { err := offsetWriter.AppendUint32(currentValueOffset) if err != nil { return utils.StackError(err, "Failed to write offset value at row %d", row) } } value := c.values[row] // Handle null value. if value == nil { // only skip bits when there is no offset vector if offsetWriter == nil { valueWriter.SkipBits(DataTypeBits(c.dataType)) } continue } switch c.dataType { case Bool: if err := valueWriter.AppendBool(value.(bool)); err != nil { return utils.StackError(err, "Failed to write bool value at row %d", row) } case Int8: if err := valueWriter.AppendInt8(value.(int8)); err != nil { return utils.StackError(err, "Failed to write int8 value at row %d", row) } case Uint8: if err := valueWriter.AppendUint8(value.(uint8)); err != nil { return utils.StackError(err, "Failed to write uint8 value at row %d", row) } case Int16: if err := valueWriter.AppendInt16(value.(int16)); err != nil { return utils.StackError(err, "Failed to write int16 value at row %d", row) } case Uint16: if err := valueWriter.AppendUint16(value.(uint16)); err != nil { return utils.StackError(err, "Failed to write uint16 value at row %d", row) } case Int32: if err := valueWriter.AppendInt32(value.(int32)); err != nil { return utils.StackError(err, "Failed to write int32 value at row %d", row) } case Int64: if err := valueWriter.AppendInt64(value.(int64)); err != nil { return utils.StackError(err, "Failed to write int64 value at row %d", row) } case Uint32: if err := valueWriter.AppendUint32(value.(uint32)); err != nil { return utils.StackError(err, "Failed to write uint32 value at row %d", row) } case Float32: if err := valueWriter.AppendFloat32(value.(float32)); err != nil { return utils.StackError(err, "Failed to write float32 value at row %d", row) } case SmallEnum: if err := valueWriter.AppendUint8(value.(uint8)); err != nil { return utils.StackError(err, "Failed to write small enum value at row %d", row) } case BigEnum: if err := valueWriter.AppendUint16(value.(uint16)); err != nil { return utils.StackError(err, "Failed to write big enum value at row %d", row) } case UUID: err := valueWriter.AppendUint64(value.([2]uint64)[0]) if err == nil { err = writer.AppendUint64(value.([2]uint64)[1]) } if err != nil { return utils.StackError(err, "Failed to write uuid value at row %d", row) } case GeoPoint: err := valueWriter.AppendFloat32(value.([2]float32)[0]) if err == nil { err = writer.AppendFloat32(value.([2]float32)[1]) } if err != nil { return utils.StackError(err, "Failed to write geopoint value at row %d", row) } case GeoShape: goVal := value.(GoDataValue) dataWriter := utils.NewStreamDataWriter(valueWriter) err := goVal.Write(&dataWriter) if err != nil { return utils.StackError(err, "Failed to write geoshape value at row %d", row) } // advance current offset currentValueOffset += uint32(goVal.GetSerBytes()) default: if IsArrayType(c.dataType) { arrayValue := value.(*ArrayValue) bytes := arrayValue.GetSerBytes() err := arrayValue.Write(valueWriter) if err != nil { return utils.StackError(err, "Failed to write array value at row %d", row) } // advance current offset currentValueOffset += uint32(bytes) } } } // lastly write the final offset into offsetWriter if offsetWriter != nil { err := offsetWriter.AppendUint32(currentValueOffset) if err != nil { return utils.StackError(err, "Failed to write offset value at row %d", len(c.values)) } } } // Align at byte for bit values. writer.AlignBytes(1) return nil } // GetMode get the mode based on number of valid values. func (c *columnBuilder) GetMode() ColumnMode { if c.numValidValues == 0 { return AllValuesDefault } else if c.numValidValues == len(c.values) { return AllValuesPresent } else { return HasNullVector } } // UpsertBatchBuilder is the builder for constructing an UpsertBatch buffer. It allows random value // write at (row, col). type UpsertBatchBuilder struct { NumRows int columns []*columnBuilder isFactTable bool } // NewUpsertBatchBuilder creates a new builder for constructing an UpersetBatch. func NewUpsertBatchBuilder() *UpsertBatchBuilder { return &UpsertBatchBuilder{} } // AddColumn add a new column to the builder. Initially, new columns have all values set to null. func (u *UpsertBatchBuilder) AddColumn(columnID int, dataType DataType) error { if len(u.columns) > math.MaxUint16 { return utils.StackError(nil, "Upsert batch cannot hold more than %d columns", math.MaxUint16) } values := make([]interface{}, u.NumRows) column := &columnBuilder{ columnID: columnID, dataType: dataType, numValidValues: 0, values: values, } if u.isFactTable && columnID == 0 && dataType == Uint32 { column.isTimeColumn = true } u.columns = append(u.columns, column) return nil } // AddColumnWithUpdateMode add a new column to the builder with update mode info. Initially, new columns have all values set to null. func (u *UpsertBatchBuilder) AddColumnWithUpdateMode(columnID int, dataType DataType, updateMode ColumnUpdateMode) error { if updateMode >= MaxColumnUpdateMode { return utils.StackError(nil, "Invalid update mode %d", updateMode) } if err := u.AddColumn(columnID, dataType); err != nil { return err } u.columns[len(u.columns)-1].updateMode = updateMode return nil } // AddRow increases the number of rows in the batch by 1. A new row with all nil values is appended // to the row array. func (u *UpsertBatchBuilder) AddRow() { for _, column := range u.columns { column.AddRow() } u.NumRows++ } // RemoveRow decreases the number of rows in the batch by 1. The last row will be removed. It's a // no-op if the number of rows is 0. func (u *UpsertBatchBuilder) RemoveRow() { if u.NumRows > 0 { for _, column := range u.columns { column.RemoveRow() } u.NumRows-- } } // ResetRows reset the row count to 0. func (u *UpsertBatchBuilder) ResetRows() { for _, column := range u.columns { column.ResetRows() } u.NumRows = 0 } // SetValue set a value to a given (row, col). func (u *UpsertBatchBuilder) SetValue(row int, col int, value interface{}) error { if row >= u.NumRows { return utils.StackError(nil, "Row index %d out of range %d", row, u.NumRows) } if col >= len(u.columns) { return utils.StackError(nil, "Col index %d out of range %d", col, len(u.columns)) } return u.columns[col].SetValue(row, value) } func (u *UpsertBatchBuilder) MarkFactTable() { u.isFactTable = true } // ToByteArray produces a serialized UpsertBatch in byte array. func (u UpsertBatchBuilder) ToByteArray() ([]byte, error) { // Create buffer. numCols := len(u.columns) // initialized size to 4 bytes (version number). versionHeaderSize := 4 // 24 bytes consist of fixed headers: // [int32] num_of_rows (4 bytes) // [uint16] num_of_columns (2 bytes) // <reserve 14 bytes> // [uint32] arrival_time (4 bytes) fixedHeaderSize := 24 columnHeaderSize := ColumnHeaderSize(numCols) headerSize := versionHeaderSize + fixedHeaderSize + columnHeaderSize size := headerSize for _, column := range u.columns { column.CalculateBufferSize(&size) } size = utils.AlignOffset(size, 8) buffer := make([]byte, size) writer := utils.NewBufferWriter(buffer) // Write upsert batch version. if err := writer.AppendUint32(uint32(V1)); err != nil { return nil, utils.StackError(err, "Failed to write version number") } // Write fixed headers. if err := writer.AppendInt32(int32(u.NumRows)); err != nil { return nil, utils.StackError(err, "Failed to write number of rows") } if err := writer.AppendUint16(uint16(len(u.columns))); err != nil { return nil, utils.StackError(err, "Failed to write number of columns") } writer.SkipBytes(14) if err := writer.AppendUint32(uint32(utils.Now().Unix())); err != nil { return nil, utils.StackError(err, "Failed to write arrival time") } columnHeader := NewUpsertBatchHeader(buffer[writer.GetOffset():headerSize], numCols) // skip to data offset writer.SkipBytes(columnHeaderSize) // Write per column data their headers. for i, column := range u.columns { if err := columnHeader.WriteColumnID(column.columnID, i); err != nil { return nil, err } if err := columnHeader.WriteColumnFlag(column.GetMode(), column.updateMode, i); err != nil { return nil, err } if err := columnHeader.WriteColumnType(column.dataType, i); err != nil { return nil, err } if err := columnHeader.WriteColumnOffset(writer.GetOffset(), i); err != nil { return nil, err } if err := column.AppendToBuffer(&writer); err != nil { return nil, utils.StackError(err, "Failed to write data for column %d", i) } if err := columnHeader.WriteColumnOffset(writer.GetOffset(), i+1); err != nil { return nil, err } } return buffer, nil } func AdditionUpdate(oldValue, newValue unsafe.Pointer, dataType DataType) { switch dataType { case Int8: *(*int8)(oldValue) = *(*int8)(oldValue) + *(*int8)(newValue) case Uint8: *(*uint8)(oldValue) = *(*uint8)(oldValue) + *(*uint8)(newValue) case Int16: *(*int16)(oldValue) = *(*int16)(oldValue) + *(*int16)(newValue) case Uint16: *(*uint16)(oldValue) = *(*uint16)(oldValue) + *(*uint16)(newValue) case Int32: *(*int32)(oldValue) = *(*int32)(oldValue) + *(*int32)(newValue) case Uint32: *(*uint32)(oldValue) = *(*uint32)(oldValue) + *(*uint32)(newValue) case Int64: *(*int64)(oldValue) = *(*int64)(oldValue) + *(*int64)(newValue) case Float32: *(*float32)(oldValue) = *(*float32)(oldValue) + *(*float32)(newValue) } } // MinMaxUpdate update the old value if compareRes == expectedRes func MinMaxUpdate(oldValue, newValue unsafe.Pointer, dataType DataType, cmpFunc CompareFunc, expectedRes int) { if compareRes := cmpFunc(oldValue, newValue); compareRes*expectedRes > 0 { switch dataType { case Int8: *(*int8)(oldValue) = *(*int8)(newValue) case Uint8: *(*uint8)(oldValue) = *(*uint8)(newValue) case Int16: *(*int16)(oldValue) = *(*int16)(newValue) case Uint16: *(*uint16)(oldValue) = *(*uint16)(newValue) case Int32: *(*int32)(oldValue) = *(*int32)(newValue) case Uint32: *(*uint32)(oldValue) = *(*uint32)(newValue) case Int64: *(*int64)(oldValue) = *(*int64)(newValue) case Float32: *(*float32)(oldValue) = *(*float32)(newValue) } } }