memstore/backfill_manager.go (150 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memstore
import (
"sync"
"encoding/json"
memCom "github.com/uber/aresdb/memstore/common"
metaCom "github.com/uber/aresdb/metastore/common"
"github.com/uber/aresdb/utils"
)
// BackfillManager manages the records that need to be put into a backfill queue and merged with
// sorted batches directly.
type BackfillManager struct {
sync.RWMutex `json:"-"`
BackfillConfig
// Name of the table.
TableName string `json:"-"`
// The shard id of the table.
Shard int `json:"-"`
// queue to hold UpsertBatches to backfill
UpsertBatches []*memCom.UpsertBatch `json:"-"`
// keep track of the number of records in backfill queue
NumRecords int `json:"numRecords"`
// keep track of the size of the buffer that holds batches to be backfilled
CurrentBufferSize int64 `json:"currentBufferSize"`
// keep track of the size of the buffer that holds batches being backfilled
BackfillingBufferSize int64 `json:"backfillingBufferSize"`
// keep track of the redo log file of the last batch backfilled
LastRedoFile int64 `json:"lastRedoFile"`
// keep track of the offset of the last batch backfilled
LastBatchOffset uint32 `json:"lastBatchOffset"`
// keep track of the redo log file of the last batch queued
CurrentRedoFile int64 `json:"currentRedoFile"`
// keep track of the offset of the last batch being queued
CurrentBatchOffset uint32 `json:"currentBatchOffset"`
AppendCond *sync.Cond `json:"-"`
}
// BackfillConfig defines configs for backfill
type BackfillConfig struct {
// max buffer size to hold backfill data
MaxBufferSize int64 `json:"maxBufferSize"`
// threshold to trigger backfill
BackfillThresholdInBytes int64 `json:"backfillThresholdInBytes"`
}
// NewBackfillManager creates a new BackfillManager instance.
func NewBackfillManager(tableName string, shard int, config BackfillConfig) *BackfillManager {
backfillManager := BackfillManager{
TableName: tableName,
Shard: shard,
BackfillConfig: config,
}
backfillManager.AppendCond = sync.NewCond(&backfillManager.RWMutex)
return &backfillManager
}
// WaitForBackfillBufferAvailability blocks until backfill buffer is available
func (r *BackfillManager) WaitForBackfillBufferAvailability() {
r.Lock()
defer r.Unlock()
for r.CurrentBufferSize+r.BackfillingBufferSize >= r.MaxBufferSize {
utils.GetLogger().Debugf("Waiting for slots of backfill manager to be available."+
"Current buffer size=%d, backfilling buffer size=%d, max buffer size=%d",
r.CurrentBufferSize, r.BackfillingBufferSize, r.MaxBufferSize)
r.AppendCond.Wait()
}
}
// Append appends an upsert batch into the backfill queue.
// Returns true if buffer limit has been reached and caller may need to wait
func (r *BackfillManager) Append(upsertBatch *memCom.UpsertBatch, redoFile int64, batchOffset uint32) bool {
r.Lock()
defer r.Unlock()
r.CurrentRedoFile = redoFile
r.CurrentBatchOffset = batchOffset
// advance position even if data is not for backfill
if upsertBatch == nil {
return false
}
utils.GetLogger().Debugf("Table %s: Backfill batch of size %v, redoLog=%d offset=%d", r.TableName, len(upsertBatch.GetBuffer())+upsertBatch.GetAlternativeBytes(),
redoFile, batchOffset)
r.UpsertBatches = append(r.UpsertBatches, upsertBatch)
r.NumRecords += upsertBatch.NumRows
r.CurrentBufferSize += (int64)(len(upsertBatch.GetBuffer()) + upsertBatch.GetAlternativeBytes())
utils.GetReporter(r.TableName, r.Shard).GetGauge(utils.BackfillBufferFillRatio).Update(float64(r.CurrentBufferSize+r.BackfillingBufferSize) / float64(r.MaxBufferSize))
utils.GetReporter(r.TableName, r.Shard).GetGauge(utils.BackfillBufferSize).Update(float64(r.CurrentBufferSize + r.BackfillingBufferSize))
utils.GetReporter(r.TableName, r.Shard).GetGauge(utils.BackfillBufferNumRecords).Update(float64(r.NumRecords))
if r.CurrentBufferSize+r.BackfillingBufferSize >= int64(float64(r.MaxBufferSize)*0.95) {
utils.GetLogger().With("size", r.CurrentBufferSize+r.BackfillingBufferSize).
Warnf("Table %s Backfill buffer is full", r.TableName)
return true
}
return false
}
// ReadUpsertBatch reads upsert batch in backfill queue, user should not lock schema
func (r *BackfillManager) ReadUpsertBatch(index, start, length int, schema *memCom.TableSchema) (data [][]interface{}, columnNames []string, err error) {
r.RLock()
defer r.RUnlock()
if index < len(r.UpsertBatches) {
upsertBatch := r.UpsertBatches[index]
columnNames, err = upsertBatch.GetColumnNames(schema)
if err != nil {
return
}
data, err = upsertBatch.ReadData(start, length)
if err != nil {
return
}
}
return
}
// StartBackfill gets a slice of UpsertBatches from backfill queue and returns the
// CurrentRedoFile and CurrentBatchOffset.
func (r *BackfillManager) StartBackfill() ([]*memCom.UpsertBatch, int64, uint32) {
r.Lock()
defer r.Unlock()
utils.GetLogger().With("action", "Backfill", "table", r.TableName, "shard", r.Shard,
"lastRedoFile", r.LastRedoFile, "lastOffset", r.LastBatchOffset, "newRedoFile", r.CurrentRedoFile,
"newOffset", r.CurrentBatchOffset).Info("Start backfill")
// no data to backfill
// but CurrentRedoFile/CurrentBatchOffset may not be checkpointed yet(live batch)
if r.CurrentBufferSize == 0 {
return nil, r.CurrentRedoFile, r.CurrentBatchOffset
}
r.BackfillingBufferSize = r.CurrentBufferSize
r.CurrentBufferSize = 0
r.NumRecords = 0
batches := r.UpsertBatches
r.UpsertBatches = nil
return batches, r.CurrentRedoFile, r.CurrentBatchOffset
}
// QualifyToTriggerBackfill decides if OK to trigger size-based backfill process
func (r *BackfillManager) QualifyToTriggerBackfill() bool {
r.RLock()
defer r.RUnlock()
return r.CurrentBufferSize >= r.BackfillThresholdInBytes
}
// advanceOffset cleans up space and wakes up enqueue processes
func (r *BackfillManager) advanceOffset(redoFile int64, offset uint32) {
r.BackfillingBufferSize = 0
r.LastRedoFile = redoFile
r.LastBatchOffset = offset
r.AppendCond.Broadcast()
}
// GetLatestRedoFileAndOffset returns latest redofile and its batch offset
func (r *BackfillManager) GetLatestRedoFileAndOffset() (int64, uint32) {
r.RLock()
defer r.RUnlock()
return r.LastRedoFile, r.LastBatchOffset
}
// MarshalJSON marshals a BackfillManager into json.
func (r *BackfillManager) MarshalJSON() ([]byte, error) {
// Avoid json.Marshal loop calls.
r.RLock()
defer r.RUnlock()
type alias BackfillManager
marshalBackfillManager := struct {
*alias
NumUpsertBatches int `json:"numUpsertBatches"`
}{
alias: (*alias)(r),
NumUpsertBatches: len(r.UpsertBatches),
}
return json.Marshal(marshalBackfillManager)
}
// Destruct set the golang object references used by backfill manager to be nil to trigger gc ealier.
func (r *BackfillManager) Destruct() {
r.UpsertBatches = nil
}
// Done updates the backfill progress both in memory and in metastore.
func (r *BackfillManager) Done(currentRedoFile int64, currentBatchOffset uint32,
metaStore metaCom.MetaStore) error {
r.Lock()
defer r.Unlock()
if currentRedoFile > r.LastRedoFile ||
currentRedoFile == r.LastRedoFile && currentBatchOffset > r.LastBatchOffset {
if err := metaStore.UpdateBackfillProgress(r.TableName, r.Shard, currentRedoFile,
currentBatchOffset); err != nil {
return err
}
utils.GetLogger().With("action", "Backfill", "table", r.TableName, "shard", r.Shard,
"lastRedoFile", r.LastRedoFile, "lastOffset", r.LastBatchOffset, "newRedoFile", currentRedoFile,
"newOffset", currentBatchOffset).Info("Finish backfill")
r.advanceOffset(currentRedoFile, currentBatchOffset)
}
return nil
}