memstore/memstore.go (244 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memstore
import (
"github.com/uber/aresdb/cluster/topology"
"github.com/uber/aresdb/datanode/bootstrap"
"sync"
"fmt"
"github.com/uber/aresdb/diskstore"
"github.com/uber/aresdb/memstore/common"
metaCom "github.com/uber/aresdb/metastore/common"
"github.com/uber/aresdb/utils"
)
// TableShardMemoryUsage contains memory usage for column memory and primary key memory usage
type TableShardMemoryUsage struct {
ColumnMemory map[string]*common.ColumnMemoryUsage `json:"cols"`
PrimaryKeyMemory uint `json:"pk"`
}
// MemStore defines the interface for managing multiple table shards in memory. This is for mocking
// in unit tests
type MemStore interface {
common.TableSchemaReader
bootstrap.Bootstrapable
// GetMemoryUsageDetails
GetMemoryUsageDetails() (map[string]TableShardMemoryUsage, error)
// GetScheduler returns the scheduler for scheduling archiving and backfill jobs.
GetScheduler() Scheduler
// GetHostMemoryManager returns the host memory manager
GetHostMemoryManager() common.HostMemoryManager
// AddTableShard add a table shard to the memstore
AddTableShard(table string, shardID int, totalShards int, needPeerCopy bool, needPurge bool)
// GetTableShard gets the data for a pinned table Shard. Caller needs to unpin after use.
GetTableShard(table string, shardID int) (*TableShard, error)
// RemoveTableShard removes table shard from memstore
RemoveTableShard(table string, shardID int)
// FetchSchema fetches schema from metaStore and updates in-memory copy of table schema,
// and set up watch channels for metaStore schema changes, used for bootstrapping mem store.
FetchSchema() error
// InitShards loads/recovers data for shards initially owned by the current instance.
InitShards(schedulerOff bool, shardOwner topology.ShardOwner)
// HandleIngestion logs an upsert batch and applies it to the in-memory store.
HandleIngestion(table string, shardID int, upsertBatch *common.UpsertBatch) error
// Archive is the process moving stable records in fact tables from live batches to archive
// batches.
Archive(table string, shardID int, cutoff uint32, reporter ArchiveJobDetailReporter) error
// Backfill is the process of merging records with event time older than cutoff with
// archive batches.
Backfill(table string, shardID int, reporter BackfillJobDetailReporter) error
// Snapshot is the process to write the current content of dimension table live store in memory to disk.
Snapshot(table string, shardID int, reporter SnapshotJobDetailReporter) error
// Purge is the process to purge out of retention archive batches
Purge(table string, shardID, batchIDStart, batchIDEnd int, reporter PurgeJobDetailReporter) error
}
// memStoreImpl implements the MemStore interface.
type memStoreImpl struct {
// memStoreImpl mutex is used to protect the TableShards and TableSchemas maps.
//
// For Shard access:
// Readers/writers must call TableShard.liveStore.Users.Add(1)
// before releasing this mutex, and call
// TableShard.liveStore.Users.Done() after their businesses.
//
// Table Shard deleter must detach the Shard first, and then call
// TableShard.liveStore.Users.Wait() before deleting the Shard.
//
// For schema access:
// User should lock the TableSchema before releasing this mutex.
//
sync.RWMutex
// Table name and Shard ID as the map keys.
TableShards map[string]map[int]*TableShard
// Schema for all tables in the system. Schemas are not deleted for simplicity
TableSchemas map[string]*common.TableSchema
HostMemManager common.HostMemoryManager
// reference to metaStore for registering watchers,
// fetch latest schema and store Shard versions.
metaStore metaCom.MetaStore
diskStore diskstore.DiskStore
options Options
// each MemStore should only have one scheduler instance.
scheduler Scheduler
}
func getTableShardKey(tableName string, shardID int) string {
return fmt.Sprintf("%s_%d", tableName, shardID)
}
// NewMemStore creates a MemStore from the specified MetaStore.
func NewMemStore(metaStore metaCom.MetaStore, diskStore diskstore.DiskStore, options Options) MemStore {
memStore := &memStoreImpl{
TableShards: make(map[string]map[int]*TableShard),
TableSchemas: make(map[string]*common.TableSchema),
metaStore: metaStore,
diskStore: diskStore,
options: options,
}
// Create HostMemoryManager
memStore.HostMemManager = NewHostMemoryManager(memStore, utils.GetConfig().TotalMemorySize)
memStore.scheduler = newScheduler(memStore)
return memStore
}
func (m *memStoreImpl) GetMemoryUsageDetails() (map[string]TableShardMemoryUsage, error) {
archiveMemoryUsageByTableShard, err := m.HostMemManager.GetArchiveMemoryUsageByTableShard()
if err != nil {
return nil, err
}
totalMemoryUsageByTableShard := map[string]TableShardMemoryUsage{}
tableShardsSnapshot := map[string][]int{}
m.RLock()
for tableName, shards := range m.TableShards {
tableShardsSnapshot[tableName] = []int{}
for shardID := range shards {
tableShardsSnapshot[tableName] = append(tableShardsSnapshot[tableName], shardID)
}
}
m.RUnlock()
for tableName, shardIDs := range tableShardsSnapshot {
for _, shardID := range shardIDs {
tableShardKey := getTableShardKey(tableName, shardID)
shard, err := m.GetTableShard(tableName, shardID)
if err != nil {
return totalMemoryUsageByTableShard, err
}
tableShardMemoryUsage := TableShardMemoryUsage{}
tableShardMemoryUsage.ColumnMemory = map[string]*common.ColumnMemoryUsage{}
// primary key memory usage
shard.LiveStore.WriterLock.RLock()
tableShardMemoryUsage.PrimaryKeyMemory = shard.LiveStore.PrimaryKey.AllocatedBytes()
shard.LiveStore.WriterLock.RUnlock()
// archive memory usage
if archiveMemoryUsage, ok := archiveMemoryUsageByTableShard[tableShardKey]; ok {
tableShardMemoryUsage.ColumnMemory = archiveMemoryUsage
}
// live store memory usage
shard.getLiveMemoryUsageByColumns(tableShardMemoryUsage.ColumnMemory)
totalMemoryUsageByTableShard[tableShardKey] = tableShardMemoryUsage
shard.Users.Done()
}
}
return totalMemoryUsageByTableShard, nil
}
func (shard *TableShard) getLiveMemoryUsageByColumns(columnMemory map[string]*common.ColumnMemoryUsage) {
shard.Schema.RLock()
valueTypeByColumn := shard.Schema.GetValueTypeByColumn()
columnIDs := map[string]int{}
for columnName, columnID := range shard.Schema.ColumnIDs {
columnIDs[columnName] = columnID
}
shard.Schema.RUnlock()
for columnName, columnID := range columnIDs {
valueType := valueTypeByColumn[columnID]
liveStoreMemory := shard.LiveStore.GetMemoryUsageForColumn(valueType, columnID)
if memoryUsage, ok := columnMemory[columnName]; ok {
memoryUsage.Live = uint(liveStoreMemory)
} else {
columnMemory[columnName] = &common.ColumnMemoryUsage{
Live: uint(liveStoreMemory),
}
}
}
}
// GetTableShard gets the data for a pinned table Shard. Caller needs to unpin after use.
func (m *memStoreImpl) GetTableShard(table string, shardID int) (*TableShard, error) {
m.RLock()
defer m.RUnlock()
tableShardMap, ok := m.TableShards[table]
if !ok {
return nil, utils.StackError(nil, "Failed to get table Shard map for table %s", table)
}
tableShard, ok := tableShardMap[shardID]
if !ok {
return nil, utils.StackError(nil, "Failed to get Shard %d for table %s", shardID, table)
}
tableShard.Users.Add(1)
return tableShard, nil
}
// GetSchema returns schema for a table.
func (m *memStoreImpl) GetSchema(table string) (*common.TableSchema, error) {
m.RLock()
defer m.RUnlock()
schema, ok := m.TableSchemas[table]
if !ok {
return nil, utils.StackError(nil, "Failed to get table schema for table %s", table)
}
return schema, nil
}
// GetSchemas returns all table schemas. Callers need to hold a reader lock to access this function.
func (m *memStoreImpl) GetSchemas() map[string]*common.TableSchema {
return m.TableSchemas
}
// GetScheduler returns the scheduler instance bound to the MemStore.
func (m *memStoreImpl) GetScheduler() Scheduler {
return m.scheduler
}
// TryEvictBatchColumn tries to evict a column from a given table/Shard/batchID.
// Return values are the check for column is deleted or not and error.
func (m *memStoreImpl) TryEvictBatchColumn(table string, shardID int, batchID int32, columnID int) (bool, error) {
tableShard, err := m.GetTableShard(table, shardID)
if err != nil {
return false, utils.StackError(err, "Failed to delete batch %d from Shard %d for table %s", batchID, shardID, table)
}
defer tableShard.Users.Done()
currentVersion := tableShard.ArchiveStore.GetCurrentVersion()
defer currentVersion.Users.Done()
currentVersion.RLock()
archivingBatch, ok := currentVersion.Batches[batchID]
currentVersion.RUnlock()
if !ok {
utils.GetLogger().Debugf("Batch already got removed from memstore: table %s, shardID %d, batchID %d, columnID %d", table, shardID, batchID, columnID)
return true, nil
}
if evictedVP := archivingBatch.TryEvict(columnID); evictedVP == nil {
return false, nil
}
utils.GetLogger().Debugf("Successfully evict batch from memstore: table %s, shardID %d, batchID %d, columnID %d", table, shardID, batchID, columnID)
return true, nil
}
func (m *memStoreImpl) AddTableShard(table string, shardID int, totalShards int, needPeerCopy bool, needPurge bool) {
m.Lock()
defer m.Unlock()
schema := m.TableSchemas[table]
if schema == nil {
// table might get deleted at this point
return
}
shardMap := m.TableShards[table]
if shardMap == nil {
shardMap = make(map[int]*TableShard)
}
if _, exist := shardMap[shardID]; !exist {
// create new shard
tableShard := NewTableShard(schema, m.metaStore, m.diskStore, m.HostMemManager, shardID, totalShards, m.options)
if needPeerCopy {
tableShard.needPeerCopy = 1
}
// purge to make sure disk space is clean for new table shard when it is added
if needPurge {
if err := tableShard.diskStore.DeleteTableShard(table, shardID); err != nil {
utils.GetLogger().With("table", table, "shard", shardID, "error", err.Error()).Fatalf("failed to purge table shard data")
}
if err := tableShard.metaStore.DeleteTableShard(table, shardID); err != nil {
utils.GetLogger().With("table", table, "shard", shardID, "error", err.Error()).Fatal("failed to purge table shard metadata")
}
}
shardMap[shardID] = tableShard
utils.AddTableShardReporter(table, shardID)
}
m.TableShards[table] = shardMap
}
func (m *memStoreImpl) RemoveTableShard(table string, shardID int) {
var shard *TableShard
// Detach first.
m.Lock()
shards := m.TableShards[table]
if shards != nil {
shard = shards[shardID]
delete(shards, shardID)
utils.DeleteTableShardReporter(table, shardID)
}
m.Unlock()
// Destruct.
if shard != nil {
shard.Destruct()
}
}
// preloadAllFactTables preloads recent days data for all columns of all table shards into memory.
// The number of preloading days is defined at each column level. This call will happen at
// shard initialization stage.
func (m *memStoreImpl) preloadAllFactTables() {
tableShardSnapshot := make(map[string][]int)
// snapshot (tableName, shardID)s.
m.RLock()
for tableName, shardMap := range m.TableShards {
tableShardSnapshot[tableName] = make([]int, 0, len(shardMap))
for shardID := range shardMap {
tableShardSnapshot[tableName] = append(tableShardSnapshot[tableName], shardID)
}
}
m.RUnlock()
currentDay := int(utils.Now().Unix() / 86400)
for tableName, shardIDs := range tableShardSnapshot {
for _, shardID := range shardIDs {
tableShard, err := m.GetTableShard(tableName, shardID)
// Table shard may have already been removed from this node.
if err != nil {
continue
}
tableShard.Schema.RLock()
columns := tableShard.Schema.Schema.Columns
tableShard.Schema.RUnlock()
if tableShard.Schema.Schema.IsFactTable {
archiveStoreVersion := tableShard.ArchiveStore.GetCurrentVersion()
for columnID, column := range columns {
if !column.Deleted {
preloadingDays := column.Config.PreloadingDays
tableShard.PreloadColumn(columnID, currentDay-preloadingDays, currentDay)
}
}
archiveStoreVersion.Users.Done()
}
tableShard.Users.Done()
}
}
}
func (m *memStoreImpl) GetHostMemoryManager() common.HostMemoryManager {
return m.HostMemManager
}