memstore/archive_store.go (256 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memstore
import (
"encoding/json"
"sync"
"strconv"
"github.com/uber/aresdb/memstore/common"
"github.com/uber/aresdb/memstore/list"
"github.com/uber/aresdb/utils"
)
// ArchiveBatch represents a archive batch.
type ArchiveBatch struct {
common.Batch
// Size of the batch (number of rows). Notice that compression changes the
// length of some columns, does not change the size of the batch.
Size int
// Version for archive batches.
Version uint32
// SeqNum denotes backfill sequence number
SeqNum uint32
// For convenience.
BatchID int32
Shard *TableShard
}
// ArchiveStoreVersion stores a version of archive batches of columnar data.
type ArchiveStoreVersion struct {
// The mutex
// protects the Batches map structure and the archiving cutoff field.
// It does not protect contents within a batch. Before releasing
// the VectorStore mutex, user should lock the batch level mutex if necessary
// to ensure proper protection at batch level.
sync.RWMutex `json:"-"`
// Wait group used to prevent this ArchiveStore from being evicted
Users sync.WaitGroup `json:"-"`
// Each batch in the slice is identified by BaseBatchID+index.
// Index out of bound and nil Batch for archive batches indicates that none of
// the columns have been loaded into memory from disk.
Batches map[int32]*ArchiveBatch `json:"batches"`
// The archiving cutoff used for this version of the sorted store.
ArchivingCutoff uint32 `json:"archivingCutoff"`
// For convenience.
shard *TableShard
}
// ArchiveStore manages archive stores versions.
// Archive store version evolves to a new version after archiving.
// Readers should follow the following locking protocol:
// archiveStore.Users.Add(1)
// // tableShard.ArchiveStore can no longer be accessed directly.
// // continue reading from archiveStore
// archiveStore.Users.Done()
type ArchiveStore struct {
// The mutex protects the pointer pointing to the current version of archived vector version.
sync.RWMutex
PurgeManager *PurgeManager
// Current version points to the most recent version of vector store version for queries to use.
CurrentVersion *ArchiveStoreVersion
}
// NewArchiveStore creates a new archive store. Current version is just a place holder for test.
// It will be replaced during recovery.
func NewArchiveStore(shard *TableShard) *ArchiveStore {
return &ArchiveStore{
PurgeManager: NewPurgeManager(shard),
CurrentVersion: NewArchiveStoreVersion(0, shard),
}
}
// NewArchiveStoreVersion creates a new empty archive store version given cutoff.
func NewArchiveStoreVersion(cutoff uint32, shard *TableShard) *ArchiveStoreVersion {
return &ArchiveStoreVersion{
Batches: make(map[int32]*ArchiveBatch),
ArchivingCutoff: cutoff,
shard: shard,
}
}
// Destruct deletes all vectors allocated in C.
// Caller must detach the Shard first and wait until all users are finished.
func (s *ArchiveStore) Destruct() {
if s.CurrentVersion == nil {
return
}
shard := s.CurrentVersion.shard
for _, batch := range s.CurrentVersion.Batches {
for columnID, vp := range batch.Columns {
if vp != nil {
vp.SafeDestruct()
shard.HostMemoryManager.ReportManagedObject(shard.Schema.Schema.Name, shard.ShardID,
int(batch.BatchID), columnID, 0)
}
}
}
}
// RequestBatch returns the requested archive batch from the archive store version.
func (v *ArchiveStoreVersion) RequestBatch(batchID int32) *ArchiveBatch {
v.Lock()
defer v.Unlock()
batch, ok := v.Batches[batchID]
if ok {
return batch
}
// Read version and size from MetaStore.
version, seqNum, size, err := v.shard.metaStore.GetArchiveBatchVersion(
v.shard.Schema.Schema.Name, v.shard.ShardID, int(batchID), v.ArchivingCutoff)
if err != nil {
utils.GetLogger().With(
"table", v.shard.Schema.Schema.Name,
"shard", v.shard.ShardID,
"batchID", batchID).Panic(err)
}
batch = &ArchiveBatch{
Version: version,
SeqNum: seqNum,
Size: size,
BatchID: batchID,
Shard: v.shard,
Batch: common.Batch{RWMutex: &sync.RWMutex{}},
}
v.Batches[batchID] = batch
return batch
}
// WriteToDisk writes each column of a batch to disk. It happens on archiving
// stage for merged archive batch so there is no need to lock it.
func (b *ArchiveBatch) WriteToDisk() error {
for columnID, column := range b.Columns {
serializer := common.NewVectorPartyArchiveSerializer(
b.Shard.HostMemoryManager, b.Shard.diskStore, b.Shard.Schema.Schema.Name, b.Shard.ShardID, columnID, int(b.BatchID), b.Version, b.SeqNum)
if err := serializer.WriteVectorParty(column); err != nil {
return err
}
}
return nil
}
// GetCurrentVersion returns current SortedVectorStoreVersion and does proper locking. It'v used by
// query and data browsing. Users need to call version.Users.Done() after their work.
func (s *ArchiveStore) GetCurrentVersion() *ArchiveStoreVersion {
s.RLock()
version := s.CurrentVersion
version.Users.Add(1)
s.RUnlock()
return version
}
// MarshalJSON marshals a ArchiveStore into json.
func (s *ArchiveStore) MarshalJSON() ([]byte, error) {
currentVersion := s.GetCurrentVersion()
defer currentVersion.Users.Done()
return json.Marshal(map[string]interface{}{
"currentVersion": currentVersion,
})
}
// MarshalJSON marshals a ArchiveStoreVersion into json.
func (v *ArchiveStoreVersion) MarshalJSON() ([]byte, error) {
// Avoid json.Marshal loop calls.
type alias ArchiveStoreVersion
v.RLock()
defer v.RUnlock()
return json.Marshal((*alias)(v))
}
// RequestVectorParty creates(optional), pins, and returns the requested vector party.
// On creation it also asynchronously loads a vector party from disk into memory.
//
// Caller must call vp.WaitForDiskLoad() before using it,
// and call vp.Release() afterwards.
func (b *ArchiveBatch) RequestVectorParty(columnID int) common.ArchiveVectorParty {
b.Lock()
defer b.Unlock()
// This is a newly added column. We need to allocate more space to put this column.
if columnID >= len(b.Columns) {
b.Columns = append(b.Columns, make([]common.VectorParty, columnID-len(b.Columns)+1)...)
}
// archive batch always have archive batch
vp := b.Columns[columnID]
if vp != nil {
// Release lock on batch and wait for vector party to be loaded
archiveVP := vp.(common.ArchiveVectorParty)
archiveVP.Pin()
return archiveVP
}
// Set a placeholder to prevent repetitive loading,
// columnID should always be smaller than len(ValueTypeByColumn).
dataType := b.Shard.Schema.ValueTypeByColumn[columnID]
defaultValue := b.Shard.Schema.DefaultValues[columnID]
if common.IsArrayType(dataType) {
vp = list.NewArchiveVectorParty(b.Size, dataType, 0, b.RWMutex)
} else {
vp = newArchiveVectorParty(b.Size, dataType, *defaultValue, b.RWMutex)
}
b.Columns[columnID] = vp
archiveVP := vp.(common.ArchiveVectorParty)
archiveVP.Pin()
archiveVP.LoadFromDisk(b.Shard.HostMemoryManager, b.Shard.diskStore, b.Shard.Schema.Schema.Name, b.Shard.ShardID, columnID, int(b.BatchID), b.Version, b.SeqNum)
return archiveVP
}
// TryEvict attempts to evict and destruct the specified column from the archive
// batch. It will fail fast if the column is currently in use so that host
// memory manager can try evicting other VPs immediately.
// Returns vector party evicted if succeeded.
func (b *ArchiveBatch) TryEvict(columnID int) common.ArchiveVectorParty {
return b.evict(columnID, false)
}
// BlockingDelete blocks until all users are finished with the specified column,
// and then deletes the column from the batch.
// Returns the vector party deleted if any.
func (b *ArchiveBatch) BlockingDelete(columnID int) common.ArchiveVectorParty {
return b.evict(columnID, true)
}
// evict attempts to evict and destruct the specified column from the archive
// batch. It will block if the blocking is set to true, other wise it will fail
// fast.
func (b *ArchiveBatch) evict(columnID int, blocking bool) common.ArchiveVectorParty {
b.Lock()
defer b.Unlock()
if columnID >= len(b.Columns) {
return nil
}
rawVP := b.Columns[columnID]
if rawVP == nil {
return nil
}
vp := rawVP.(common.ArchiveVectorParty)
if !vp.WaitForUsers(blocking) {
return nil
}
b.Columns[columnID] = nil
vp.SafeDestruct()
b.Shard.HostMemoryManager.ReportManagedObject(
b.Shard.Schema.Schema.Name, b.Shard.ShardID, int(b.BatchID), columnID, 0)
return vp
}
// GetBatchForRead returns a archiveBatch for read,
// reader needs to unlock after use
func (v *ArchiveStoreVersion) GetBatchForRead(batchID int) *ArchiveBatch {
batch := v.Batches[int32(batchID)]
if batch != nil {
batch.RLock()
}
return batch
}
// MarshalJSON marshals a ArchiveBatch into json.
func (b *ArchiveBatch) MarshalJSON() ([]byte, error) {
b.RLock()
defer b.RUnlock()
return json.Marshal(map[string]interface{}{
"numColumns": len(b.Columns),
"size": b.Size,
"version": b.Version,
})
}
// BuildIndex builds an index over the primary key columns of this archive batch and inserts the records id into the
// given primary key.
func (b *ArchiveBatch) BuildIndex(sortColumns []int, primaryKeyColumns []int, pk common.PrimaryKey) error {
if b == nil {
return nil
}
key := make([]byte, b.Shard.Schema.PrimaryKeyBytes)
var err error
primaryKeyValues := make([]common.DataValue, len(primaryKeyColumns))
// we need to use sortedColumnIterator to advance sort column.
sortedColumnIterators := make([]sortedColumnIterator, len(primaryKeyColumns))
// create sort column iterators if the primary key column is also a sort column.
for i, primaryKeyColumnID := range primaryKeyColumns {
if utils.IndexOfInt(sortColumns, primaryKeyColumnID) >= 0 {
sortedColumnIterators[i] = newArchiveBatchColumnIterator(b, primaryKeyColumnID, nil)
sortedColumnIterators[i].setEndPosition(uint32(b.Size))
}
}
numDuplicateRecords := 0
for row := 0; row < b.Size; row++ {
// Prepare primary key values.
for i, primaryKeyColumnID := range primaryKeyColumns {
if sortedColumnIterators[i] != nil {
if uint32(row) >= sortedColumnIterators[i].nextPosition() {
sortedColumnIterators[i].next()
}
primaryKeyValues[i] = sortedColumnIterators[i].value()
} else {
// Primary key column will not have any default values.
primaryKeyValues[i] = b.GetDataValue(row, primaryKeyColumnID)
}
if !primaryKeyValues[i].Valid {
return utils.StackError(nil, "Primary key col is null at row %d col %d "+
"batchID %d batch %v",
row, primaryKeyColumnID, b.BatchID, b)
}
}
// Get primary key for each record.
// truncate key
key = key[:0]
if key, err = common.AppendPrimaryKeyBytes(key, common.NewSliceDataValueIterator(primaryKeyValues)); err != nil {
return err
}
existing, _, err := pk.FindOrInsert(key, common.RecordID{BatchID: b.BatchID, Index: uint32(row)}, 0)
if err != nil {
return err
}
if existing {
// Found duplicate record in backfill is a data correctness issue,
// in which new update will only go to one of the records depending on the sort order.
// we decide for now this rare case will proceed but trigger alert
// so that user can adjust schema and backfill data when needed,
// instead of crash the server completely.
numDuplicateRecords++
}
}
if numDuplicateRecords > 0 {
utils.GetLogger().With(
"table", b.Shard.Schema.Schema.Name,
"shard", b.Shard.ShardID,
"batch", b.BatchID,
"error", "duplicate record",
).Errorf("duplicate record found when building index")
utils.GetReporter(b.Shard.Schema.Schema.Name, b.Shard.ShardID).GetChildGauge(map[string]string{
"batch": strconv.FormatInt(int64(b.BatchID), 10),
}, utils.DuplicateRecordRatio).Update(float64(numDuplicateRecords) / float64(b.Size))
}
return nil
}
// Clone returns a copy of current batch including all references to underlying vector parties. Caller is responsible
// for holding the lock (if necessary).
func (b *ArchiveBatch) Clone() *ArchiveBatch {
newBatch := &ArchiveBatch{
Batch: common.Batch{
RWMutex: b.Batch.RWMutex,
Columns: make([]common.VectorParty, len(b.Columns)),
},
Version: b.Version,
SeqNum: b.SeqNum,
Size: b.Size,
BatchID: b.BatchID,
Shard: b.Shard,
}
copy(newBatch.Columns, b.Columns)
return newBatch
}
// UnpinVectorParties unpins all vector parties in the slice.
func UnpinVectorParties(requestedVPs []common.ArchiveVectorParty) {
for _, vp := range requestedVPs {
vp.Release()
}
}