memstore/common/batch.go (72 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package common import ( "fmt" "os" "sync" ) // BatchReader defines the interface to retrieve a DataValue given a row index and // column index. type BatchReader interface { GetDataValue(row, columnID int) DataValue GetDataValueWithDefault(row, columnID int, defaultValue DataValue) DataValue } // Batch represents a sorted or live batch. type Batch struct { // Batch mutex is locked in reader mode by queries during the entire transfer // to ensure row level consistent read. It is locked in writer mode only for // updates from ingestion, and for modifications to the columns slice itself // (e.g., adding new columns). Appends will update LastReadBatchID and // NumRecordsInLastWriteBatch to make newly added records visible only at the last // step, therefore the batch does not need to be locked for appends. // For sorted bathes this is also locked in writer mode for initiating loading // from disk (vector party creation, Loader/Users initialization), and for // vector party detaching during eviction. *sync.RWMutex // For live batches, index out of bound and nil VectorParty indicates // mode 0 for the corresponding VectorParty. // For archive batches, index out of bound and nil VectorParty indicates that // the corresponding VectorParty has not been loaded into memory from disk. Columns []VectorParty } // GetVectorParty returns the VectorParty for the specified column from // the batch. It requires the batch to be locked for reading. func (b *Batch) GetVectorParty(columnID int) VectorParty { if columnID >= len(b.Columns) { return nil } return b.Columns[columnID] } // GetDataValue read value from underlying columns. func (b *Batch) GetDataValue(row, columnID int) DataValue { if columnID >= len(b.Columns) { return NullDataValue } vp := b.Columns[columnID] if vp == nil { return NullDataValue } return vp.GetDataValue(row) } // GetDataValueWithDefault read value from underlying columns and if it's missing, it will return // passed value instead. func (b *Batch) GetDataValueWithDefault(row, columnID int, defaultValue DataValue) DataValue { if columnID >= len(b.Columns) { return defaultValue } vp := b.Columns[columnID] if vp == nil { return defaultValue } return vp.GetDataValue(row) } // SafeDestruct destructs all vector parties of this batch. func (b *Batch) SafeDestruct() { if b != nil { for _, col := range b.Columns { if col != nil { col.SafeDestruct() } } } } // Equals check whether two batches are the same. Notes both batches should have all its columns loaded into memory // before comparison. Therefore this function should be only called for unit test purpose. func (b *Batch) Equals(other *Batch) bool { if b == nil || other == nil { return b == nil && other == nil } if len(b.Columns) != len(other.Columns) { return false } for columnID := range b.Columns { if !VectorPartyEquals(b.Columns[columnID], other.Columns[columnID]) { return false } } return true } func (b *Batch) Dump(file *os.File) { fmt.Fprintf(file, "Dump Batch, columns: %d\n", len(b.Columns)) for i, col := range b.Columns { fmt.Fprintf(file, "col: %d\n", i) if col != nil { col.Dump(file) } } }