memstore/common/upsert_batch.go (372 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package common
import (
"bytes"
"github.com/uber/aresdb/utils"
"math"
"unsafe"
)
// columnReader contains meta data for accessing the data of a column in an UpsertBatch.
type columnReader struct {
// The logic id of the column.
columnID int
// The column mode.
columnMode ColumnMode
// The column update mode
columnUpdateMode ColumnUpdateMode
// DataType of the column.
dataType DataType
// The value vector. can be empty depending on column mode.
valueVector []byte
// The null vector. can be empty depending on column mode.
nullVector []byte
// The offset vector. Only used for variable length values. Not used yet.
offsetVector []byte
// Compare function if any.
cmpFunc CompareFunc
}
// ReadGoValue returns the GoDataValue from upsert batch at given row
func (c *columnReader) ReadGoValue(row int) GoDataValue {
offset := c.readOffset(row)
nextOffset := c.readOffset(row + 1)
if offset == nextOffset {
return nil
}
goValue := GetGoDataValue(c.dataType)
dataReader := utils.NewStreamDataReader(bytes.NewReader(c.valueVector[offset:]))
err := goValue.Read(&dataReader)
if err != nil {
return nil
}
return goValue
}
// ReadValue returns the row data (fixed sized) for a column, including the pointer to the data,
// and the validity of the value.
func (c *columnReader) ReadValue(row int) (unsafe.Pointer, bool) {
validity := c.readValidity(row)
if !validity {
return nil, false
}
if IsArrayType(c.dataType) {
return c.readArrayValue(row)
}
return unsafe.Pointer(&c.valueVector[row*DataTypeBits(c.dataType)/8]), true
}
// ReadArrayValue returns the ArrayValue from upsert batch at given row
func (c *columnReader) readArrayValue(row int) (unsafe.Pointer, bool) {
offset := c.readOffset(row)
nextOffset := c.readOffset(row + 1)
if offset == nextOffset {
return nil, false
}
return unsafe.Pointer(&c.valueVector[offset]), true
}
// ReadValue returns the row data (boolean type) for a column, and its validity.
func (c *columnReader) ReadBool(row int) (bool, bool) {
validity := c.readValidity(row)
if !validity {
return false, false
}
return readBool(c.valueVector, row), true
}
func (c *columnReader) readOffset(row int) uint32 {
return *(*uint32)(unsafe.Pointer(&c.offsetVector[row*4]))
}
// readValidity return the validity value of a row in a column.
func (c *columnReader) readValidity(row int) bool {
switch c.columnMode {
case AllValuesDefault:
return false
case AllValuesPresent:
return true
}
return readBool(c.nullVector, row)
}
func readBool(buffer []byte, index int) bool {
return buffer[index/8]&(0x1<<uint8(index%8)) != 0x0
}
func writeBool(buffer []byte, index int, value bool) {
if value {
buffer[index/8] |= 0x1 << uint8(index%8)
} else {
buffer[index/8] &^= 0x1 << uint8(index%8)
}
}
// UpsertBatch stores and indexes a serialized upsert batch of data on a particular table.
// It is used for both client-server data transfer and redo logging.
// In redo logs each batch is prepended by a 4-byte buffer size.
// The serialized buffer of the batch is in the following format:
// [uint32] magic_number
// [uint32] buffer_size
//
// <begin of buffer>
// [int32] version_number
// [int32] num_of_rows
// [uint16] num_of_columns
// <reserve 14 bytes>
// [uint32] arrival_time
// [uint32] column_offset_0 ... [uint32] column_offset_x+1
// [uint32] column_reserved_field1_0 ... [uint32] column_reserved_field1_x
// [uint32] column_reserved_field2_0 ... [uint32] column_reserved_field2_x
// [uint32] column_data_type_0 ... [uint32] column_data_type_x
// [uint16] column_id_0 ... [uint16] column_id_x
// [uint8] column_mode_0 ... [uint8] column_mode_x
//
// (optional) [uint8] null_vector_0
// (optional) [padding to 4 byte alignment uint32] offset_vector_0
// [padding for 8 byte alignment] value_vector_0
// ...
//
// [padding for 8 byte alignment]
// <end of buffer>
// Each component in the serialized buffer is byte aligned (not pointer aligned or bit aligned).
// All serialized numbers are written in little-endian.
// The struct is used for both client serialization and server deserialization.
// See https://github.com/uber/aresdb/wiki/redo_logs for more details.
//
// Note: only fixed size values are supported currently.
type UpsertBatch struct {
// Number of rows in the batch, must be between 0 and 65535.
NumRows int
// Number of columns.
NumColumns int
// Arrival Time of Upsert Batch
ArrivalTime uint32
// Serialized buffer of the batch, starts from NumRows, does not contain the 4-byte
// buffer size.
buffer []byte
// When records are extracted for backfill, buffer is no longer used and we
// use alternativeBytes to track the memory usage.
alternativeBytes int
// Columns to upsert on.
columns []*columnReader
// Column id maps the logic column id to local column index.
columnsByID map[int]int
}
// GetBuffer returns the underline buffer used to construct the upsert batch.
func (u *UpsertBatch) GetBuffer() []byte {
return u.buffer
}
// GetAlternativeBytes returns alternativeBytes
func (u *UpsertBatch) GetAlternativeBytes() int {
return u.alternativeBytes
}
// convenient function to get columns len
func (u *UpsertBatch) GetColumnLen() int {
return len(u.columns)
}
// convenient function to get ColumnMode, assume no out of index
func (u *UpsertBatch) GetColumMode(col int) ColumnMode {
return u.columns[col].columnMode
}
// convenient function to get ColumnUpdateMode, assume no out of index
func (u *UpsertBatch) GetColumnUpdateMode(col int) ColumnUpdateMode {
return u.columns[col].columnUpdateMode
}
// convenient function to get GoDataValue
func (u *UpsertBatch) ReadGoValue(row, col int) GoDataValue {
return u.columns[col].ReadGoValue(row)
}
// GetColumnID returns the logical id of a column.
func (u *UpsertBatch) GetColumnID(col int) (int, error) {
if col >= len(u.columns) {
return 0, utils.StackError(nil, "Column index %d out of range %d", col, len(u.columns))
}
return u.columns[col].columnID, nil
}
// GetColumnType returns the data type of a column.
func (u *UpsertBatch) GetColumnType(col int) (DataType, error) {
if col >= len(u.columns) {
return 0, utils.StackError(nil, "Column index %d out of range %d", col, len(u.columns))
}
return u.columns[col].dataType, nil
}
// GetColumnIndex returns the local index of a column given a logical index id.
func (u *UpsertBatch) GetColumnIndex(columnID int) (int, error) {
col, ok := u.columnsByID[columnID]
if !ok {
return 0, utils.StackError(nil, "Column %d does not exist", columnID)
}
return col, nil
}
// GetValue returns the data (fixed sized) stored at (row, col), including the pointer to the data,
// and the validity of the value.
func (u *UpsertBatch) GetValue(row int, col int) (unsafe.Pointer, bool, error) {
if col >= len(u.columns) {
return nil, false, utils.StackError(nil, "Column index %d out of range %d", col, len(u.columns))
}
if row >= u.NumRows {
return nil, false, utils.StackError(nil, "Row index %d out of range %d", col, u.NumRows)
}
data, validity := u.columns[col].ReadValue(row)
return data, validity, nil
}
// GetBool returns the data (boolean type) stored at (row, col), and the validity of the value.
func (u *UpsertBatch) GetBool(row int, col int) (bool, bool, error) {
if col >= len(u.columns) {
return false, false, utils.StackError(nil, "Column index %d out of range %d", col, len(u.columns))
}
if row >= u.NumRows {
return false, false, utils.StackError(nil, "Row index %d out of range %d", col, u.NumRows)
}
data, validity := u.columns[col].ReadBool(row)
return data, validity, nil
}
// GetDataValue returns the DataValue for the given row and col index.
// It first check validity of the value, then it check whether it's a
// boolean column to decide whether to load bool value or other value
// type.
// user of GetDataValue should check row, col using NumRows and NumColumns
func (u *UpsertBatch) GetDataValue(row, col int) DataValue {
return u.GetDataValueWithDefault(row, col, NullDataValue)
}
// GetDataValueWithDefault get the data value at with row and col and return defaultVal when row, col is out of bound
func (u *UpsertBatch) GetDataValueWithDefault(row, col int, defaultVal DataValue) DataValue {
val := defaultVal
if col >= u.NumColumns || row >= u.NumRows {
return val
}
dataType := u.columns[col].dataType
val.DataType = dataType
if dataType == Bool {
val.IsBool = true
val.BoolVal, val.Valid = u.columns[col].ReadBool(row)
return val
}
if IsGoType(dataType) {
val.GoVal = u.columns[col].ReadGoValue(row)
val.Valid = val.GoVal != nil
return val
}
val.OtherVal, val.Valid = u.columns[col].ReadValue(row)
val.CmpFunc = GetCompareFunc(dataType)
return val
}
// GetEventColumnIndex returns the column index of event time
func (u *UpsertBatch) GetEventColumnIndex() int {
// Validate columns in upsert batch are valid.
for i := 0; i < u.NumColumns; i++ {
columnID, _ := u.GetColumnID(i)
if columnID == 0 {
return i
}
}
return -1
}
// GetPrimaryKeyCols converts primary key columnIDs to cols in this upsert batch.
func (u *UpsertBatch) GetPrimaryKeyCols(primaryKeyColumnIDs []int) ([]int, error) {
primaryKeyCols := make([]int, len(primaryKeyColumnIDs))
for i, columnID := range primaryKeyColumnIDs {
col, err := u.GetColumnIndex(columnID)
if err != nil {
return nil, utils.StackError(err, "Primary key column %d is missing from upsert batch", columnID)
}
primaryKeyCols[i] = col
}
return primaryKeyCols, nil
}
// ExtractBackfillBatch extracts given rows and stores in a new UpsertBatch
// The returned new UpsertBatch is not fully serialized and can only be used for
// structured reads.
func (u *UpsertBatch) ExtractBackfillBatch(backfillRows []int) *UpsertBatch {
if len(backfillRows) == 0 {
return nil
}
newBatch := *u
newBatch.NumRows = len(backfillRows)
newBatch.buffer = nil
newBatch.columns = make([]*columnReader, 0, len(u.columns))
for _, oldCol := range u.columns {
newCol := *oldCol
if newCol.columnUpdateMode > UpdateForceOverwrite {
// ignore those columns with conditional updates from backfill
// clean up the column data
colID := newCol.columnID
if _, found := newBatch.columnsByID[newCol.columnID]; found {
delete(newBatch.columnsByID, colID)
}
newBatch.NumColumns--
continue
}
newBatch.columnsByID[newCol.columnID] = len(newBatch.columns)
newBatch.columns = append(newBatch.columns, &newCol)
newCol.valueVector = nil
newCol.nullVector = nil
newCol.offsetVector = nil
switch newCol.columnMode {
case AllValuesDefault:
case HasNullVector:
nullVectorLength := utils.AlignOffset(newBatch.NumRows, 8) / 8
newCol.nullVector = make([]byte, nullVectorLength)
newBatch.alternativeBytes += nullVectorLength
for newRow, oldRow := range backfillRows {
validity := oldCol.readValidity(oldRow)
writeBool(newCol.nullVector, newRow, validity)
}
fallthrough
case AllValuesPresent:
valueBits := DataTypeBits(newCol.dataType)
valueVectorLength := utils.AlignOffset(newBatch.NumRows*valueBits, 8) / 8
newCol.valueVector = make([]byte, valueVectorLength)
newBatch.alternativeBytes += valueVectorLength
for newRow, oldRow := range backfillRows {
if valueBits == 1 {
boolValue := readBool(oldCol.valueVector, oldRow)
writeBool(newCol.valueVector, newRow, boolValue)
} else {
valueBytes := valueBits / 8
utils.MemCopy(unsafe.Pointer(&newCol.valueVector[newRow*valueBytes]),
unsafe.Pointer(&oldCol.valueVector[oldRow*valueBytes]), valueBytes)
}
}
}
}
return &newBatch
}
// GetColumnNames reads columnNames in UpsertBatch, user should not lock schema
func (u *UpsertBatch) GetColumnNames(schema *TableSchema) ([]string, error) {
columnNames := make([]string, u.NumColumns)
for columnIdx := range u.columns {
columnID, err := u.GetColumnID(columnIdx)
if err != nil {
return nil, err
}
schema.RLock()
if columnID > len(schema.Schema.Columns) {
schema.RUnlock()
return nil, utils.StackError(nil, "Column id %d out of range %d",
columnID, len(schema.Schema.Columns))
}
columnNames[columnIdx] = schema.Schema.Columns[columnID].Name
schema.RUnlock()
}
return columnNames, nil
}
// ReadData reads data from upsert batch and convert values to meaningful representations given data type.
func (u *UpsertBatch) ReadData(start int, length int) ([][]interface{}, error) {
// Only read the column names.
if length == 0 {
return nil, nil
}
length = int(math.Min(float64(length), float64(u.NumRows-start)))
if length <= 0 {
return nil, utils.StackError(nil, "Invalid start or length")
}
rows := make([][]interface{}, length)
for row := start; row < start+length; row++ {
idx := row - start
rows[idx] = make([]interface{}, u.NumColumns)
for col := 0; col < u.NumColumns; col++ {
val := u.GetDataValue(row, col)
dataType, err := u.GetColumnType(col)
if err != nil {
return nil, err
}
rows[idx][col] = val.ConvertToHumanReadable(dataType)
}
}
return rows, nil
}
func readUpsertBatch(buffer []byte) (*UpsertBatch, error) {
batch := &UpsertBatch{
buffer: buffer,
columnsByID: make(map[int]int),
}
// numRows.
reader := utils.NewBufferReader(buffer)
numRows, err := reader.ReadInt32(4)
if err != nil {
return nil, utils.StackError(err, "Failed to read number of rows")
}
if numRows < 0 {
return nil, utils.StackError(err, "Number of rows should be >= 0")
}
batch.NumRows = int(numRows)
// numColumns.
numColumns, err := reader.ReadUint16(4 + 4)
if err != nil {
return nil, utils.StackError(err, "Failed to read number of columns")
}
batch.NumColumns = int(numColumns)
// 2 byte num columns
arrivalTime, err := reader.ReadUint32(4 + 4 + 2 + 14)
if err != nil {
return nil, utils.StackError(err, "Failed to read arrival time")
}
batch.ArrivalTime = arrivalTime
// Header too small, error out.
if len(buffer) < 28+ColumnHeaderSize(batch.NumColumns) {
return nil, utils.StackError(nil, "Invalid upsert batch data with incomplete header section")
}
header := NewUpsertBatchHeader(buffer[28:], batch.NumColumns)
columns := make([]*columnReader, batch.NumColumns)
for i := range columns {
columnType, err := header.ReadColumnType(i)
if err != nil {
return nil, utils.StackError(err, "Failed to read type for column %d", i)
}
columnID, err := header.ReadColumnID(i)
if err != nil {
return nil, utils.StackError(err, "Failed to read id for column %d", i)
}
batch.columnsByID[columnID] = i
columnMode, columnUpdateMode, err := header.ReadColumnFlag(i)
if err != nil {
return nil, utils.StackError(err, "Failed to read mode for column %d", i)
}
columns[i] = &columnReader{columnID: columnID, columnMode: columnMode, columnUpdateMode: columnUpdateMode, dataType: columnType,
cmpFunc: GetCompareFunc(columnType)}
columnStartOffset, err := header.ReadColumnOffset(i)
if err != nil {
return nil, utils.StackError(err, "Failed to read start offset for column %d", i)
}
columnEndOffset, err := header.ReadColumnOffset(i + 1)
if err != nil {
return nil, utils.StackError(err, "Failed to read end offset for column %d", i)
}
isGoType := IsGoType(columnType)
isArrayType := IsArrayType(columnType)
currentOffset := columnStartOffset
switch columnMode {
case AllValuesDefault:
case HasNullVector:
if !isGoType {
// Null vector points to the beginning of the column data section.
nullVectorLength := utils.AlignOffset(batch.NumRows, 8) / 8
columns[i].nullVector = buffer[currentOffset : currentOffset+nullVectorLength]
currentOffset += nullVectorLength
}
fallthrough
case AllValuesPresent:
if isGoType || isArrayType {
currentOffset = utils.AlignOffset(currentOffset, 4)
offsetVectorLength := (batch.NumRows + 1) * 4
columns[i].offsetVector = buffer[currentOffset : currentOffset+offsetVectorLength]
currentOffset += offsetVectorLength
}
// Round up to 8 byte padding.
currentOffset = utils.AlignOffset(currentOffset, 8)
columns[i].valueVector = buffer[currentOffset:columnEndOffset]
}
}
batch.columns = columns
return batch, nil
}
// NewUpsertBatch deserializes an upsert batch on the server.
// buffer does not contain the 4-byte buffer size.
func NewUpsertBatch(buffer []byte) (*UpsertBatch, error) {
reader := utils.NewBufferReader(buffer)
// read version
version, err := reader.ReadUint32(0)
if err != nil {
return nil, utils.StackError(err, "Failed to read upsert batch version number")
}
if UpsertBatchVersion(version) == V1 {
return readUpsertBatch(buffer)
}
return nil, utils.StackError(nil, "Unsupported upsert batch version %x", version)
}