memstore/host_memory_manager.go (461 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memstore
import (
"container/heap"
"sync"
"sync/atomic"
metaCom "github.com/uber/aresdb/metastore/common"
"github.com/uber/aresdb/utils"
rbt "github.com/emirpasic/gods/trees/redblacktree"
"github.com/uber/aresdb/memstore/common"
)
// preloadJob defines the job struct to preload column when preloading days is changed.
type preloadJob struct {
tableName string
columnID int
oldPreloadingDays int
newPreloadingDays int
}
type hostMemoryManager struct {
sync.RWMutex
memStore *memStoreImpl
metaStore metaCom.MetaStore
// totalMemorySize is configurable uplimit.
totalMemorySize int64
// unManagedMemorySize and managedMemorySize reflect current memory usage status.
unManagedMemorySize int64
managedMemorySize int64
// Init maps for table -> (columnID -> columnBatchInfos) mapping
batchInfosByColumn map[string]map[int]*columnBatchInfos
// channel to send preloadJob.
preloadJobChan chan preloadJob
// channel to stop preload go routines.
preloadStopChan chan struct{}
// channel to send eviction job.
// TODO: if later we find to many go routines waiting for the channel,
// we will use sync.Cond to rewrite it.
evictionJobChan chan struct{}
// channel to stop eviction go routines.
evictionStopChan chan struct{}
}
// shardBatchID is the internal data holder struct to store
// shardID and batchID which used as key in the columnBatchInfos.
type shardBatchID struct {
shardID int
batchID int
}
func newShardBatchID(shardID int, batchID int) shardBatchID {
return shardBatchID{
shardID: shardID,
batchID: batchID,
}
}
// shardBatchIDComparator provides a basic comparison on shardBatchID
func shardBatchIDComparator(a, b interface{}) int {
aAsserted := a.(shardBatchID)
bAsserted := b.(shardBatchID)
if aAsserted.batchID == bAsserted.batchID {
return bAsserted.shardID - aAsserted.shardID
}
return aAsserted.batchID - bAsserted.batchID
}
// columnBatchInfos is using RB-Tree data structure to hold shardBatchID to
// size mapping
type columnBatchInfos struct {
table string
batchInfoByID *rbt.Tree
sync.RWMutex
}
func newColumnBatchInfos(table string) *columnBatchInfos {
return &columnBatchInfos{
table: table,
batchInfoByID: rbt.NewWith(shardBatchIDComparator),
}
}
// SetManagedObject is used to add a new batch/update an existing batch.
// Returns the bytes changes during this operation. For new batch, it's
// same as bytes value. For update batch, it's the value of
// bytesChanges = (currentBytes - oldBytes).
func (a *columnBatchInfos) SetManagedObject(shard, batchID int, bytes int64) int64 {
a.Lock()
defer a.Unlock()
key := newShardBatchID(shard, batchID)
oldSizeInterface, found := a.batchInfoByID.Get(key)
bytesChanges := bytes
if found {
oldSize := oldSizeInterface.(int64)
bytesChanges = bytes - oldSize
}
a.batchInfoByID.Put(key, bytes)
return bytesChanges
}
// deleteManagedObject is used to delete a batch.
// Returns the bytes got changed.
func (a *columnBatchInfos) DeleteManagedObject(shard, batchID int) int64 {
a.Lock()
defer a.Unlock()
bytesChange := int64(0)
key := newShardBatchID(shard, batchID)
sizeInterface, found := a.batchInfoByID.Get(key)
if found {
size := sizeInterface.(int64)
bytesChange = 0 - size
a.batchInfoByID.Remove(key)
}
return bytesChange
}
// GetArchiveMemoryUsageByShard returns memory usage [preload, non-preload] by shard
func (a *columnBatchInfos) GetArchiveMemoryUsageByShard(preloadDays int) map[int]*common.ColumnMemoryUsage {
a.RLock()
defer a.RUnlock()
memoryUsageByShard := map[int]*common.ColumnMemoryUsage{}
iterator := a.batchInfoByID.Iterator()
for iterator.Next() {
shardBatchID, _ := iterator.Key().(shardBatchID)
batchID := shardBatchID.batchID
shard := shardBatchID.shardID
bytes := iterator.Value().(int64)
_, shardExist := memoryUsageByShard[shard]
if !shardExist {
memoryUsageByShard[shard] = &common.ColumnMemoryUsage{}
}
if isPreloadingBatch(batchID, preloadDays) {
memoryUsageByShard[shard].Preloaded += uint(bytes)
} else {
memoryUsageByShard[shard].NonPreloaded += uint(bytes)
}
}
return memoryUsageByShard
}
// NewHostMemoryManager is used to init a HostMemoryManager.
func NewHostMemoryManager(memStore *memStoreImpl, totalMemorySize int64) common.HostMemoryManager {
hostMemoryManager := &hostMemoryManager{
memStore: memStore,
metaStore: memStore.metaStore,
totalMemorySize: totalMemorySize,
unManagedMemorySize: 0,
managedMemorySize: 0,
batchInfosByColumn: make(map[string]map[int]*columnBatchInfos),
preloadJobChan: make(chan preloadJob),
preloadStopChan: make(chan struct{}),
evictionJobChan: make(chan struct{}),
evictionStopChan: make(chan struct{}),
}
utils.GetRootReporter().GetGauge(utils.TotalMemorySize).Update(float64(totalMemorySize))
return hostMemoryManager
}
// All the following three functions trigger preloading and eviction
// asynchrounously. In addition, as time goes on, reloading and eviction can
// also be triggered automatically.
// ReportUnmanagedSpaceUsageChange : Increase/Decrease bytes to the unmanaged space
// usage (for live batches and PKeys).
// Positive bytes number means to increase UnmanagedSpaceUsage, negative number
// means to decrease UnmanagedSpaceUsage.
func (h *hostMemoryManager) ReportUnmanagedSpaceUsageChange(bytes int64) {
atomic.AddInt64(&h.unManagedMemorySize, int64(bytes))
utils.GetRootReporter().GetGauge(utils.UnmanagedMemorySize).Update(float64(h.getUnmanagedSpaceUsage()))
if bytes < 0 {
h.TriggerEviction()
}
}
// ReportManagedObject : Report space usage for a managed object (archive batch vector party).
func (h *hostMemoryManager) ReportManagedObject(table string, shard, batchID, columnID int, bytes int64) {
if bytes <= 0 {
h.deleteManagedObject(table, shard, batchID, columnID)
} else {
h.addOrUpdateManagedObject(table, shard, batchID, columnID, bytes)
h.TriggerEviction()
}
utils.GetRootReporter().GetGauge(utils.ManagedMemorySize).Update(float64(h.getManagedSpaceUsage()))
}
// Start will do a blocking preloading first and then start the go routines to do
// data preloading and eviction.
func (h *hostMemoryManager) Start() {
utils.GetLogger().Info("HostMemoryManager: initial preloading done")
// Preloader execution loop.
go func() {
for {
select {
case j := <-h.preloadJobChan:
h.handleColumnPreloadingDaysChange(j)
case <-h.preloadStopChan:
return
}
}
}()
// Evictor execution loop.
go func() {
for {
select {
case <-h.evictionJobChan:
h.tryEviction()
case <-h.evictionStopChan:
return
}
}
}()
}
// Stop stops the gom rountines to do data preloading and eviction. It's a
// blocking call.
func (h *hostMemoryManager) Stop() {
h.preloadStopChan <- struct{}{}
h.evictionStopChan <- struct{}{}
}
// TriggerPreload will handle the column preloading days config change and
// trigger the column preloading if necessary. It's a asynchronous call.
func (h *hostMemoryManager) TriggerPreload(tableName string, columnID int,
oldPreloadingDays int, newPreloadingDays int) {
go func() {
h.preloadJobChan <- preloadJob{
tableName: tableName,
columnID: columnID,
oldPreloadingDays: oldPreloadingDays,
newPreloadingDays: newPreloadingDays,
}
}()
}
// TriggerEviction triggers the eviction. It's a asynchronous call.
func (h *hostMemoryManager) TriggerEviction() {
go func() { h.evictionJobChan <- struct{}{} }()
}
func (h *hostMemoryManager) getUnmanagedSpaceUsage() int64 {
return atomic.LoadInt64(&h.unManagedMemorySize)
}
func (h *hostMemoryManager) getManagedSpaceUsage() int64 {
return atomic.LoadInt64(&h.managedMemorySize)
}
// GetArchiveMemoryUsageByTableShard get the managed memory details by table shard and column
func (h *hostMemoryManager) GetArchiveMemoryUsageByTableShard() (map[string]map[string]*common.ColumnMemoryUsage, error) {
h.RLock()
defer h.RUnlock()
// tableName_shardID -> columnName -> columnMemoryUsage
managedMemoryUsage := map[string]map[string]*common.ColumnMemoryUsage{}
for tableName, batchInfoByColumn := range h.batchInfosByColumn {
tableSchema, err := h.memStore.GetSchema(tableName)
if err != nil {
// ignore deleted table
continue
}
for columnID, batchInfo := range batchInfoByColumn {
tableSchema.RLock()
columnConfig := tableSchema.Schema.Columns[columnID]
tableSchema.RUnlock()
memoryUsageByShard := batchInfo.GetArchiveMemoryUsageByShard(columnConfig.Config.PreloadingDays)
for shardID, columnMemoryUsage := range memoryUsageByShard {
tableShard := getTableShardKey(tableName, shardID)
if _, ok := managedMemoryUsage[tableShard]; ok {
managedMemoryUsage[tableShard][columnConfig.Name] = columnMemoryUsage
} else {
managedMemoryUsage[tableShard] = map[string]*common.ColumnMemoryUsage{
columnConfig.Name: columnMemoryUsage,
}
}
}
}
}
return managedMemoryUsage, nil
}
// managedObjectExists : Return whether the corresponding managed object exists in managed memory.
func (h *hostMemoryManager) managedObjectExists(table string, shard, batchID, columnID int) bool {
h.RLock()
defer h.RUnlock()
tableInMemoryBatches, found := h.batchInfosByColumn[table]
if !found {
return false
}
columnBatchInfos, found := tableInMemoryBatches[columnID]
if !found {
return false
}
key := newShardBatchID(shard, batchID)
_, found = columnBatchInfos.batchInfoByID.Get(key)
return found
}
// AddOrUpdateManagedObject : Report space usage increase or update for a managed object (archive batch vector party).
func (h *hostMemoryManager) addOrUpdateManagedObject(table string, shard, batchID, columnID int, bytes int64) {
h.Lock()
tableInMemoryBatches, found := h.batchInfosByColumn[table]
if !found {
tableInMemoryBatches = make(map[int]*columnBatchInfos)
h.batchInfosByColumn[table] = tableInMemoryBatches
}
columnBatchInfos, found := tableInMemoryBatches[columnID]
if !found {
columnBatchInfos = newColumnBatchInfos(table)
tableInMemoryBatches[columnID] = columnBatchInfos
}
h.Unlock()
bytesChange := columnBatchInfos.SetManagedObject(shard, batchID, bytes)
atomic.AddInt64(&h.managedMemorySize, bytesChange)
utils.GetLogger().Debugf("addOrUpdateManagedObject(%s,%d,%d,%d,%d), bytesChange = %d, "+
"managedMemorySize=%d\n ", table, shard, batchID, columnID, bytes, bytesChange, h.getManagedSpaceUsage())
}
// deleteManagedObject : Report space usage reduce for a managed object (archive batch vector party).
func (h *hostMemoryManager) deleteManagedObject(table string, shard, batchID, columnID int) {
h.Lock()
utils.GetLogger().Debugf("Trying to deleteManagedObject for table: %s, Shard: %d, batchID: %d, in %+v", table, shard, batchID, h.batchInfosByColumn)
tableInMemoryBatches, found := h.batchInfosByColumn[table]
if !found {
utils.GetLogger().Debugf("Not found tableInMemoryBatches for table: %s", table)
h.Unlock()
return
}
columnBatchInfos, found := tableInMemoryBatches[columnID]
h.Unlock()
if !found {
utils.GetLogger().Debugf("Not found columnBatchInfos for columnID: %s in table: %s.", columnID, table)
return
}
bytesChange := columnBatchInfos.DeleteManagedObject(shard, batchID)
utils.GetLogger().Debugf("Before deleteManagedObject managedMemorySize : %d, bytesChange : %d", h.getManagedSpaceUsage(), bytesChange)
atomic.AddInt64(&h.managedMemorySize, bytesChange)
utils.GetLogger().Debugf("After deleteManagedObject managedMemorySize : %d", h.getManagedSpaceUsage())
h.Lock()
if columnBatchInfos.batchInfoByID.Size() == 0 {
delete(tableInMemoryBatches, columnID)
}
h.Unlock()
utils.GetLogger().Debugf("deleteManagedObject(%s,%d,%d,%d), bytesChange = %d, managedMemorySize=%d\n ", table, shard, batchID, columnID, bytesChange, h.getManagedSpaceUsage())
}
// handleColumnPreloadingDaysChange handles the preloading config change for a column.
func (h *hostMemoryManager) handleColumnPreloadingDaysChange(j preloadJob) {
if j.newPreloadingDays <= j.oldPreloadingDays {
return
}
shardIDs := make([]int, 0)
// snapshot shardIDs.
h.memStore.RLock()
shardMap := h.memStore.TableShards[j.tableName]
for shardID := range shardMap {
shardIDs = append(shardIDs, shardID)
}
h.memStore.RUnlock()
currentDay := int(utils.Now().Unix() / 86400)
for _, shardID := range shardIDs {
tableShard, err := h.memStore.GetTableShard(j.tableName, shardID)
// Table shard may have already been removed from this node.
if err != nil {
continue
}
if tableShard.Schema.Schema.IsFactTable {
tableShard.PreloadColumn(j.columnID, currentDay-j.newPreloadingDays, currentDay-j.oldPreloadingDays)
}
tableShard.Users.Done()
}
}
// tryEviction : try to trigger eviction once
// unManagedMem + managedMem > totalAssignedMem. This method will pop batches
// from the per column holder data structure, calculate global priority
// based on column metadata, then push the batch into a priority queue.
// Eviction will happen through all the populated batches until memory usage
// decreases to a certain level. All failed eviction batches will be
// reinserted.
func (h *hostMemoryManager) tryEviction() {
// Check if eviction should be triggered
if (h.totalMemorySize - h.getManagedSpaceUsage() - h.getUnmanagedSpaceUsage()) < 0 {
utils.GetLogger().Debugf("UnmanagedMem: %d + ManagedMem: %d is larger than totalMem: %d! Eviction is triggered.",
h.getUnmanagedSpaceUsage(), h.getManagedSpaceUsage(), h.totalMemorySize)
// Init all columnar priority batches.
gpq := h.initialGlobalPriorityQueue()
// Pop from globalPriorityQueueWithLock and do eviction
for (h.totalMemorySize-h.getManagedSpaceUsage()-h.getUnmanagedSpaceUsage()) < 0 && !gpq.isEmpty() {
globalPriorityItem := gpq.pop()
batchPriority := globalPriorityItem.priority
columnBatchInfos := globalPriorityItem.value
columnIt := globalPriorityItem.it
tableSchema, err := h.memStore.GetSchema(columnBatchInfos.table)
tableSchema.RLock()
preloadingDays := tableSchema.Schema.Columns[batchPriority.columnID].Config.PreloadingDays
tableSchema.RUnlock()
isPreloadingDays := isPreloadingBatch(batchPriority.batchID, preloadingDays)
if isPreloadingDays {
utils.GetReporter(columnBatchInfos.table, batchPriority.shardID).
GetCounter(utils.PreloadingZoneEvicted).Inc(1)
utils.GetLogger().With(
"table", columnBatchInfos.table,
"shard", batchPriority.shardID,
"batch", batchPriority.batchID,
"column", batchPriority.columnID,
).Warn("Column in preloading zone is evicted")
}
ok, err := h.memStore.TryEvictBatchColumn(columnBatchInfos.table, batchPriority.shardID, int32(batchPriority.batchID), batchPriority.columnID)
if ok {
utils.GetLogger().Debugf("Successfully evict batch from memstore: table %s, shardID %d, batchID %d, columnID %d, size %d",
columnBatchInfos.table, batchPriority.shardID, batchPriority.batchID, batchPriority.columnID, batchPriority.size)
} else {
utils.GetLogger().Debugf("Failed to evict batch from memstore: table %s, shardID %d, batchID %d, columnID %d, size %d, errors: %s",
columnBatchInfos.table, batchPriority.shardID, batchPriority.batchID, batchPriority.columnID, batchPriority.size, err)
}
// Adding the corresponding next batch into priority queue.
if columnIt.Next() {
gpq.pushBatchIntoGlobalPriorityQueue(h, columnBatchInfos, batchPriority.columnID, columnIt)
}
}
// Still cannot meet the memory constraints even after evictions.
if h.totalMemorySize-h.getManagedSpaceUsage()-h.getUnmanagedSpaceUsage() < 0 {
utils.GetRootReporter().GetCounter(utils.MemoryOverflow).Inc(1)
utils.GetLogger().Warn("Still cannot meet the memory constraints even after evictions")
}
}
}
// pushBatchIntoGlobalPriorityQueue will generate a globalPriority object then
// push it into globalPriorityQueueWithLock.
func (gpq *globalPriorityQueue) pushBatchIntoGlobalPriorityQueue(h *hostMemoryManager,
columnBatchInfos *columnBatchInfos, columnID int, columnIt rbt.Iterator) {
// batchInfo := batchInfoInterface.(*archiveBatchInfo)
sbID := columnIt.Key().(shardBatchID)
size := columnIt.Value().(int64)
tableSchema, err := h.memStore.GetSchema(columnBatchInfos.table)
if err == nil {
tableSchema.RLock()
columnConfig := tableSchema.Schema.Columns[columnID]
tableSchema.RUnlock()
if !columnConfig.Deleted {
preloadingDays := columnConfig.Config.PreloadingDays
isPreloading := isPreloadingBatch(sbID.batchID, preloadingDays)
batchPriority := createBatchPriority(sbID.shardID, columnID, isPreloading,
columnConfig.Config.Priority, sbID.batchID, size)
globalPriorityItem := &globalPriorityItem{
value: columnBatchInfos,
it: columnIt,
priority: batchPriority,
}
gpq.push(globalPriorityItem)
utils.GetLogger().Debugf("Pushed batchPrioirty %s to global queue", batchPriority)
}
}
}
// initialGlobalPriorityQueue will initialize a globalPriorityQueueWithLock and fetch
// one batch for each table column from batchInfosByColumn.
func (h *hostMemoryManager) initialGlobalPriorityQueue() *globalPriorityQueue {
gpq := newGlobalPriorityQueue()
utils.GetLogger().Debugf("Trying to init priority queue to hold batch objects")
h.RLock()
for tableName, columnsBatchesList := range h.batchInfosByColumn {
utils.GetLogger().Debugf("Looking at table:%s, columnsBatchesList.size() = %d", tableName, len(columnsBatchesList))
for columnID, columnBatchInfos := range columnsBatchesList {
columnBatchIt := columnBatchInfos.batchInfoByID.Iterator()
if columnBatchIt.Next() {
gpq.pushBatchIntoGlobalPriorityQueue(h, columnBatchInfos, columnID, columnBatchIt)
}
}
}
h.RUnlock()
return gpq
}
// globalPriority is the holding struct for all the neccesarry fields to
// compare batch priority in a global view.
type globalPriority struct {
shardID int
columnID int
// globalPriority comparison is based on the below 4 fields.
isPreloading bool
columnPriority int64
batchID int
size int64
}
// globalPriorityComparator provides a basic comparison on globalPriority
func globalPriorityComparator(a, b interface{}) int {
aAsserted := a.(*globalPriority)
bAsserted := b.(*globalPriority)
if aAsserted.isPreloading == bAsserted.isPreloading {
if aAsserted.columnPriority == bAsserted.columnPriority {
if aAsserted.batchID == bAsserted.batchID {
return int(bAsserted.size - aAsserted.size)
}
return aAsserted.batchID - bAsserted.batchID
}
return int(aAsserted.columnPriority - bAsserted.columnPriority)
} else if aAsserted.isPreloading {
return 1
} else {
return -1
}
}
func createBatchPriority(shardID, columnID int, isPreloading bool, columnPriority int64, batchID int, size int64) *globalPriority {
return &globalPriority{
shardID: shardID,
columnID: columnID,
isPreloading: isPreloading,
columnPriority: columnPriority,
batchID: batchID,
size: size,
}
}
// globalPriorityQueue definition START.
// A globalPriorityQueue implements heap.Interface and holds many Item pointers.
// Sorting method is based on globalPriorityComparator for struct globalPriority.
type globalPriorityItem struct {
value *columnBatchInfos
it rbt.Iterator
priority *globalPriority
}
type globalPriorityQueue []*globalPriorityItem
func (gpq globalPriorityQueue) Len() int { return len(gpq) }
func (gpq globalPriorityQueue) Less(i, j int) bool {
// We want Pop to give us the highest, not lowest, priority so we use greater than here.
return globalPriorityComparator(gpq[i].priority, gpq[j].priority) < 0
}
func (gpq globalPriorityQueue) Swap(i, j int) {
gpq[i], gpq[j] = gpq[j], gpq[i]
}
func (gpq *globalPriorityQueue) Push(x interface{}) {
globalPriorityItem := x.(*globalPriorityItem)
*gpq = append(*gpq, globalPriorityItem)
}
func (gpq *globalPriorityQueue) push(item *globalPriorityItem) {
heap.Push(gpq, item)
}
func (gpq *globalPriorityQueue) Pop() interface{} {
old := *gpq
n := len(old)
globalPriorityItem := old[n-1]
*gpq = old[0 : n-1]
return globalPriorityItem
}
func (gpq *globalPriorityQueue) pop() *globalPriorityItem {
globalPriorityItem := heap.Pop(gpq).(*globalPriorityItem)
return globalPriorityItem
}
func newGlobalPriorityQueue() *globalPriorityQueue {
gpq := make(globalPriorityQueue, 0)
heap.Init(&gpq)
return &gpq
}
func (gpq *globalPriorityQueue) isEmpty() bool {
return gpq.Len() == 0
}
func (gpq *globalPriorityQueue) size() int {
return gpq.Len()
}
// globalPriorityQueue definition END.
// isPreloadingBatch will check if a given batchID falling into the preloading
// zone.
// batchID is daysSinceEpoch value.
func isPreloadingBatch(batchID, preloadingDays int) bool {
if int(utils.Now().Unix()/86400)-batchID < preloadingDays {
return true
}
return false
}