memstore/live_store.go (309 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memstore
import (
"encoding/json"
"github.com/uber/aresdb/memstore/vectors"
"math"
"sync"
"time"
"github.com/uber/aresdb/memstore/common"
"github.com/uber/aresdb/redolog"
"github.com/uber/aresdb/utils"
)
// BaseBatchID is the starting id of all batches.
const BaseBatchID = int32(math.MinInt32)
// LiveBatch represents a live batch.
type LiveBatch struct {
// The common data structure holding column data.
common.Batch
// Capacity of the batch which is decided at the creation time.
Capacity int
// For convenience to access fields of live store.
// Schema locks should be acquired after data locks.
liveStore *LiveStore
// maximum of arrival time
MaxArrivalTime uint32
}
// LiveStore stores live batches of columnar data.
type LiveStore struct {
sync.RWMutex
// Following fields are protected by above mutex.
// The batch id to batch map.
Batches map[int32]*LiveBatch
// Number of rows to create for new batches.
BatchSize int
// The upper bound of records (exclusive) that can be read by queries.
LastReadRecord common.RecordID
// This is the in memory archiving cutoff time high watermark that gets set by the archiving job
// before each archiving run. Ingestion will not insert/update records that are older than
// the archiving cutoff watermark.
ArchivingCutoffHighWatermark uint32
// Logs.
RedoLogManager redolog.RedologManager
// Manage backfill queue during ingestion.
BackfillManager *BackfillManager
// Manage snapshot related stats.
SnapshotManager *SnapshotManager
// For convenience. Schema locks should be acquired after data locks.
tableSchema *common.TableSchema
// The writer lock is to guarantee single writer to a Shard at all time. To ensure this, writers
// (ingestion, archiving etc) need to hold this lock at all times. This lock
// should be acquired before the VectorStore and Batch locks.
// TODO: if spinning lock performance is a concern we may need to upgrade this
// to a designated goroutine with a pc-queue channel.
WriterLock sync.RWMutex
// Following fields are protected by WriterLock.
// Primary key table of the Shard.
PrimaryKey common.PrimaryKey
// The position of the next record to be used for writing. Only used by the ingester.
NextWriteRecord common.RecordID
// For convenience.
HostMemoryManager common.HostMemoryManager `json:"-"`
// Last modified time per column in live store and fact table only. Used to measure the data freshness for each column.
// Protected by the writer lock of live store. If a column is never ingested, thhe last modified time will be zero.
// Metrics will be emitted after each ingestion request.
lastModifiedTimePerColumn []uint32
}
// NewLiveStore creates a new live batch.
func NewLiveStore(batchSize int, totalShards int, shard *TableShard) *LiveStore {
schema := shard.Schema
tableCfg := schema.Schema.Config
// for now dimension table is unsharded
unsharded := !schema.Schema.IsFactTable
redoLogManager, err := shard.options.redoLogMaster.NewRedologManager(schema.Schema.Name, shard.ShardID, unsharded, &tableCfg)
if err != nil {
utils.GetLogger().Fatal(err)
}
ls := &LiveStore{
BatchSize: batchSize,
Batches: make(map[int32]*LiveBatch),
tableSchema: schema,
LastReadRecord: common.RecordID{BatchID: BaseBatchID, Index: 0},
NextWriteRecord: common.RecordID{BatchID: BaseBatchID, Index: 0},
PrimaryKey: NewPrimaryKey(schema.PrimaryKeyBytes, schema.Schema.IsFactTable,
// initial primary key buckets should consider number of shards
schema.Schema.Config.InitialPrimaryKeyNumBuckets/totalShards,
shard.HostMemoryManager),
RedoLogManager: redoLogManager,
HostMemoryManager: shard.HostMemoryManager,
}
if schema.Schema.IsFactTable {
ls.BackfillManager = NewBackfillManager(schema.Schema.Name, shard.ShardID, BackfillConfig{
// MaxBufferSize should consider total number of shards
MaxBufferSize: tableCfg.BackfillMaxBufferSize / int64(totalShards),
BackfillThresholdInBytes: tableCfg.BackfillThresholdInBytes,
})
// reportBatch memory usage of backfill max buffer size.
ls.HostMemoryManager.ReportUnmanagedSpaceUsageChange(
int64(ls.BackfillManager.MaxBufferSize * utils.GolangMemoryFootprintFactor))
} else {
ls.SnapshotManager = NewSnapshotManager(shard)
}
return ls
}
// GetBatchIDs snapshots the batches and returns a list of batch ids for read
// with the number of records in batchIDs[len()-1].
func (s *LiveStore) GetBatchIDs() (batchIDs []int32, numRecordsInLastBatch int) {
s.RLock()
for key, batch := range s.Batches {
if key < s.LastReadRecord.BatchID {
batchIDs = append(batchIDs, key)
numRecordsInLastBatch = batch.Capacity
}
}
if s.LastReadRecord.Index > 0 {
batchIDs = append(batchIDs, s.LastReadRecord.BatchID)
numRecordsInLastBatch = int(s.LastReadRecord.Index)
}
s.RUnlock()
return
}
// GetBatchForRead returns and read locks the batch with its ID for reads. Caller must explicitly
// RUnlock() the returned batch after all reads.
func (s *LiveStore) GetBatchForRead(id int32) *LiveBatch {
s.RLock()
batch := s.Batches[id]
if batch != nil {
batch.RLock()
}
s.RUnlock()
return batch
}
// GetBatchForWrite returns and locks the batch with its ID for reads. Caller must explicitly
// Unlock() the returned batch after all reads.
func (s *LiveStore) GetBatchForWrite(id int32) *LiveBatch {
s.RLock()
batch := s.Batches[id]
if batch != nil {
batch.Lock()
}
s.RUnlock()
return batch
}
// GetOrCreateBatch retrieve LiveBatch for specified batchID, append one if not exist
// this is only used during read snapshots, otherwise the batchID maybe cause conflict
// user should unlock the batch after use
func (s *LiveStore) getOrCreateBatch(batchID int32) *LiveBatch {
batch := s.GetBatchForWrite(batchID)
if batch == nil {
batch = s.appendBatch(batchID)
batch.Lock()
}
return batch
}
// AdvanceNextWriteRecord reserves space for a record that return the next available record position
// back to the caller.
func (s *LiveStore) AdvanceNextWriteRecord() common.RecordID {
s.RLock()
batch := s.Batches[s.NextWriteRecord.BatchID]
s.RUnlock()
if batch == nil {
// We only create batch when the current NextWriteRecord is pointing to nil.
batch = s.appendBatch(s.NextWriteRecord.BatchID)
}
if int(s.NextWriteRecord.Index)+1 == batch.Capacity {
s.NextWriteRecord.BatchID++
s.NextWriteRecord.Index = 0
} else {
s.NextWriteRecord.Index++
}
return s.NextWriteRecord
}
// AdvanceLastReadRecord advances the high watermark of the rows to the next write record.
func (s *LiveStore) AdvanceLastReadRecord() {
s.Lock()
s.LastReadRecord = s.NextWriteRecord
s.Unlock()
}
// appendBatch appends a new batch. The batch is returned with its ID.
func (s *LiveStore) appendBatch(batchID int32) *LiveBatch {
if s.Batches[batchID] != nil {
return nil
}
valueTypeByColumn := s.tableSchema.GetValueTypeByColumn()
numColumns := len(valueTypeByColumn)
batch := &LiveBatch{
Batch: common.Batch{
RWMutex: &sync.RWMutex{},
Columns: make([]common.VectorParty, numColumns),
},
Capacity: s.BatchSize,
liveStore: s,
}
s.Lock()
s.Batches[batchID] = batch
s.Unlock()
return batch
}
// PurgeBatch purges the specified batch.
func (s *LiveStore) PurgeBatch(id int32) {
s.Lock()
// Detach first.
batch := s.Batches[id]
delete(s.Batches, id)
s.Unlock()
if batch != nil {
// Wait for readers to finish.
batch.Lock()
batch.Unlock()
// SafeDestruct.
for _, vp := range batch.Columns {
if vp != nil {
bytes := -vp.GetBytes()
vp.SafeDestruct()
s.HostMemoryManager.ReportUnmanagedSpaceUsageChange(bytes)
}
}
}
}
// Destruct deletes all vectors allocated in C.
// Caller must detach the Shard first and wait until all users are finished.
func (s *LiveStore) Destruct() {
s.PrimaryKey.Destruct()
for batchID := range s.Batches {
s.PurgeBatch(batchID)
}
// reportBatch free memory used by backfill manager.
if s.BackfillManager != nil {
s.BackfillManager.Destruct()
go func() {
// Delay 1 second to report memory change to wait for gc happens.
timer := time.NewTimer(time.Second)
<-timer.C
s.HostMemoryManager.ReportUnmanagedSpaceUsageChange(
-int64(s.BackfillManager.MaxBufferSize * utils.GolangMemoryFootprintFactor))
}()
}
}
// PurgeBatches purges the specified batches.
func (s *LiveStore) PurgeBatches(ids []int32) {
for _, id := range ids {
s.PurgeBatch(id)
}
}
// MarshalJSON marshals a LiveStore into json.
func (s *LiveStore) MarshalJSON() ([]byte, error) {
// Avoid json.Marshal loop calls.
jsonMap := make(map[string]interface{})
// Following fields are protected by reader lock of liveStore.
s.RLock()
batchMapJSON, err := json.Marshal(s.Batches)
if err != nil {
s.RUnlock()
return nil, err
}
jsonMap["batches"] = json.RawMessage(batchMapJSON)
jsonMap["batchSize"] = s.BatchSize
jsonMap["lastReadRecord"] = s.LastReadRecord
jsonMap["lastModifiedTimePerColumn"] = s.lastModifiedTimePerColumn
redologManagerJSON, err := json.Marshal(&s.RedoLogManager)
if err != nil {
s.RUnlock()
return nil, err
}
jsonMap["redoLogManager"] = json.RawMessage(redologManagerJSON)
backfillManagerJSON, err := json.Marshal(s.BackfillManager)
if err != nil {
s.RUnlock()
return nil, err
}
jsonMap["backfillManager"] = json.RawMessage(backfillManagerJSON)
snapshotManagerJSON, err := json.Marshal(s.SnapshotManager)
if err != nil {
s.RUnlock()
return nil, err
}
jsonMap["snapshotManager"] = json.RawMessage(snapshotManagerJSON)
s.RUnlock()
// Following fields are protected by writer lock of liveStore.
s.WriterLock.RLock()
pkJSON, err := common.MarshalPrimaryKey(s.PrimaryKey)
if err != nil {
s.WriterLock.RUnlock()
return nil, err
}
jsonMap["primaryKey"] = json.RawMessage(pkJSON)
jsonMap["nextWriteRecord"] = s.NextWriteRecord
s.WriterLock.RUnlock()
return json.Marshal(jsonMap)
}
// LookupKey looks up the given key in primary key.
func (s *LiveStore) LookupKey(keyStrs []string) (common.RecordID, bool) {
key := make([]byte, s.tableSchema.PrimaryKeyBytes)
if len(s.tableSchema.PrimaryKeyColumnTypes) != len(keyStrs) {
return common.RecordID{}, false
}
index := 0
for colIndex, columnType := range s.tableSchema.PrimaryKeyColumnTypes {
dataValue, err := common.ValueFromString(keyStrs[colIndex], columnType)
if err != nil || !dataValue.Valid {
return common.RecordID{}, false
}
if dataValue.IsBool {
if dataValue.BoolVal {
key[index] = 1
} else {
key[index] = 0
}
index++
} else {
for i := 0; i < common.DataTypeBits(columnType)/8; i++ {
key[index] = *(*byte)(utils.MemAccess(dataValue.OtherVal, i))
index++
}
}
}
s.WriterLock.RLock()
defer s.WriterLock.RUnlock()
return s.PrimaryKey.Find(key)
}
// GetMemoryUsageForColumn get the live store memory usage for given data type
func (s *LiveStore) GetMemoryUsageForColumn(valueType common.DataType, columnID int) int {
batchIDs, _ := s.GetBatchIDs()
var liveStoreMemory int
for _, batchID := range batchIDs {
liveBatch := s.GetBatchForRead(batchID)
if liveBatch == nil {
continue
}
if columnID < len(liveBatch.Columns) && liveBatch.Columns[columnID] != nil {
liveStoreMemory += int(liveBatch.Columns[columnID].GetBytes())
}
liveBatch.RUnlock()
}
return liveStoreMemory
}
// GetOrCreateVectorParty returns LiveVectorParty for the specified column from
// the live batch. locked specifies whether the batch has been locked.
// The lock will be left in the same state after the function returns.
func (b *LiveBatch) GetOrCreateVectorParty(columnID int, locked bool) common.LiveVectorParty {
// Ensure that columnID is not out of bound.
if columnID >= len(b.Columns) {
if !locked {
b.Lock()
}
for columnID >= len(b.Columns) {
b.Columns = append(b.Columns, nil)
}
if !locked {
b.Unlock()
}
}
// Ensure that the VectorParty is allocated with values and nulls.
if b.Columns[columnID] == nil {
b.liveStore.tableSchema.RLock()
dataType := b.liveStore.tableSchema.ValueTypeByColumn[columnID]
defaultValue := *b.liveStore.tableSchema.DefaultValues[columnID]
b.liveStore.tableSchema.RUnlock()
bytes := vectors.CalculateVectorPartyBytes(dataType, b.Capacity, true, false)
b.liveStore.HostMemoryManager.ReportUnmanagedSpaceUsageChange(int64(bytes))
liveVP := NewLiveVectorParty(b.Capacity, dataType, defaultValue, b.liveStore.HostMemoryManager)
liveVP.Allocate(false)
if !locked {
b.Lock()
}
b.Columns[columnID] = liveVP
if !locked {
b.Unlock()
}
return liveVP
}
return b.Columns[columnID].(common.LiveVectorParty)
}
// MarshalJSON marshals a LiveBatch into json.
func (b *LiveBatch) MarshalJSON() ([]byte, error) {
b.RLock()
defer b.RUnlock()
return json.Marshal(map[string]interface{}{
"numColumns": len(b.Columns),
"capacity": b.Capacity,
})
}