memstore/archiving.go (327 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memstore
import (
"sort"
memCom "github.com/uber/aresdb/memstore/common"
metaCom "github.com/uber/aresdb/metastore/common"
"github.com/uber/aresdb/utils"
)
// liveStoreSnapshot stores a snapshot of the LiveStore structure
// for fast archiving read without mutex locking and map accessing.
// The structure is snapshotted so that new batch/vector party creation by
// ingestion will not affect archiving. The underlying data is still shared with
// ingestion in parallel, with no read/write conflict, assuming that:
// - archiving will not read beyond lastReadRecord for newly appended records
// - archiving will only read records older than the cutoff, while ingestion
// will not update any record older than that (backfill will be delayed until
// ongoing archiving completes).
type liveStoreSnapshot struct {
// Stores the structure as [RandomBatchIndex][ColumnID].
batches [][]memCom.VectorParty
// For purging live batch later.
batchIDs []int32
numRecordsInLastBatch int
}
// snapshot creates a snapshot of the LiveStore structure for archiving and backfill fast read.
func (s *LiveStore) snapshot() (ss liveStoreSnapshot) {
batchIDs, numRecordsInLastBatch := s.GetBatchIDs()
ss.batchIDs = batchIDs
ss.batches = make([][]memCom.VectorParty, len(batchIDs))
ss.numRecordsInLastBatch = numRecordsInLastBatch
for i, batchID := range batchIDs {
batch := s.GetBatchForRead(batchID)
// Live batches are purged by archiving so all batches returned here
// should be valid.
ss.batches[i] = make([]memCom.VectorParty, len(batch.Columns))
copy(ss.batches[i], batch.Columns)
batch.RUnlock()
}
return
}
// archivingPatch stores records to be patched onto a archive batch.
// The records are identified by recordIDs and stored in the snapshot.
// The records will be sorted according to the sortColumns.
type archivingPatch struct {
// RecordID.BatchID here refers to the RandomBatchIndex in the snapshot.
recordIDs []memCom.RecordID
sortColumns []int
// Readonly. We won't change it during sorting archiving patch and merging
// with archive batch.
data liveStoreSnapshot
}
// createArchivingPatches creates an archiving patch per affected UTC day.
// The key of the returned map is the number of days since Unix Epoch.
func (ss liveStoreSnapshot) createArchivingPatches(
cutoff uint32, oldCutoff uint32, sortColumns []int,
reporter ArchiveJobDetailReporter, jobKey string, tableName string, shardID int,
) map[int32]*archivingPatch {
patchByDay := make(map[int32]*archivingPatch)
numBatches := len(ss.batches)
reporter(jobKey, func(status *ArchiveJobDetail) {
status.Stage = ArchivingCreatePatch
status.Current = 0
status.Total = numBatches
})
var numRecordsArchived int64
var numRecordsIgnored int64
for batchIdx, batch := range ss.batches {
timeColumn := batch[0]
numRecords := timeColumn.GetLength()
minValue, _ := timeColumn.(memCom.LiveVectorParty).GetMinMaxValue()
if batchIdx == len(ss.batches)-1 {
numRecords = ss.numRecordsInLastBatch
}
if minValue < cutoff {
for recordIdx := 0; recordIdx < numRecords; recordIdx++ {
dataValue := timeColumn.GetDataValue(recordIdx)
if dataValue.Valid {
time := *(*uint32)(dataValue.OtherVal)
if time < cutoff {
if time >= oldCutoff {
// Add the record for archiving
day := int32(time / 86400)
patch := patchByDay[day]
if patch == nil {
patch = &archivingPatch{
data: ss,
sortColumns: sortColumns,
}
patchByDay[day] = patch
}
patch.recordIDs = append(patch.recordIDs,
memCom.RecordID{BatchID: int32(batchIdx), Index: uint32(recordIdx)})
numRecordsArchived++
} else {
numRecordsIgnored++
}
}
}
}
}
reporter(jobKey, func(status *ArchiveJobDetail) {
status.Current = batchIdx + 1
})
}
utils.GetLogger().With("action", "archiving", "table", tableName, "shard", shardID, "numArchived",
numRecordsArchived, "numIgnored", numRecordsIgnored, "oldCutoff", oldCutoff, "newCutoff", cutoff).Info("createArchivingPatches")
utils.GetReporter(tableName, shardID).GetCounter(utils.ArchivingIgnoredRecords).Inc(numRecordsIgnored)
utils.GetReporter(tableName, shardID).GetCounter(utils.ArchivingRecords).Inc(numRecordsArchived)
reporter(jobKey, func(status *ArchiveJobDetail) {
status.NumRecords = int(numRecordsArchived)
})
return patchByDay
}
// getBatchIDsToPurge returns list of batchIDs to purge in live store if its
// max event time is less than cutoff
// We do not purge the last batch if it's partially archived (ss.numRecordsInLastBatch
// != lastBatch.Size).
func (s *LiveStore) getBatchIDsToPurge(cutoff uint32) []int32 {
var batchIDs []int32
s.RLock()
s.tableSchema.RLock()
allowMissingEventTime := s.tableSchema.Schema.Config.AllowMissingEventTime
s.tableSchema.RUnlock()
for batchID, batch := range s.Batches {
if batchID >= s.LastReadRecord.BatchID {
continue
}
timeColumn := batch.Columns[0]
if timeColumn != nil {
if _, maxValue := timeColumn.(memCom.LiveVectorParty).GetMinMaxValue(); maxValue < cutoff {
if !allowMissingEventTime || timeColumn.GetNonDefaultValueCount() == batch.Capacity || batch.MaxArrivalTime < cutoff {
batchIDs = append(batchIDs, batchID)
}
}
} else if !allowMissingEventTime || batch.MaxArrivalTime < cutoff {
batchIDs = append(batchIDs, batchID)
}
}
s.RUnlock()
return batchIDs
}
func (ap archivingPatch) Len() int {
return len(ap.recordIDs)
}
func (ap archivingPatch) Swap(i, j int) {
ap.recordIDs[i], ap.recordIDs[j] = ap.recordIDs[j], ap.recordIDs[i]
}
func (ap archivingPatch) Less(i, j int) bool {
for _, columnID := range ap.sortColumns {
iValue := ap.GetDataValue(i, columnID)
jValue := ap.GetDataValue(j, columnID)
res := iValue.Compare(jValue)
if res != 0 {
return res < 0
}
// Tie, move on to next sort column.
}
return false
}
// GetDataValue reads value from underlying columns after sorted.
func (ap *archivingPatch) GetDataValue(row, columnID int) memCom.DataValue {
recordID := ap.recordIDs[row]
batch := ap.data.batches[recordID.BatchID]
if columnID >= len(batch) {
return memCom.NullDataValue
}
vp := batch[columnID]
if vp == nil {
return memCom.NullDataValue
}
return vp.GetDataValue(int(recordID.Index))
}
// GetDataValue reads value from underlying columns after sorted. If it's missing, it will return
// passed value instead.
func (ap *archivingPatch) GetDataValueWithDefault(row, columnID int, defaultValue memCom.DataValue) memCom.DataValue {
recordID := ap.recordIDs[row]
batch := ap.data.batches[recordID.BatchID]
if columnID >= len(batch) {
return defaultValue
}
vp := batch[columnID]
if vp == nil {
return defaultValue
}
return vp.GetDataValue(int(recordID.Index))
}
// GetCount get number of elements for values in the specified row/column, it is only valid for Array Value
func (ap *archivingPatch) GetCount(row, columnID int) int {
recordID := ap.recordIDs[row]
batch := ap.data.batches[recordID.BatchID]
if columnID >= len(batch) {
return 0
}
vp := batch[columnID]
if vp == nil {
return 0
}
if vp.IsList() {
return int(vp.AsList().GetElemCount(int(recordID.Index)))
}
return 0
}
// Archive is the process of periodically moving stable records in fact tables from live batches to archive batches,
// and converting them to a compressed format (run-length encoding). This is a blocking call so caller need to wait
// for archiving process to finish.
func (m *memStoreImpl) Archive(table string, shardID int, cutoff uint32, reporter ArchiveJobDetailReporter) error {
archivingTimer := utils.GetReporter(table, shardID).GetTimer(utils.ArchivingTimingTotal)
start := utils.Now()
jobKey := getIdentifier(table, shardID, memCom.ArchivingJobType)
// Emit duration metrics and report back to scheduler.
defer func() {
duration := utils.Now().Sub(start)
archivingTimer.Record(duration)
reporter(jobKey, func(status *ArchiveJobDetail) {
status.LastDuration = duration
})
utils.GetReporter(table, shardID).
GetCounter(utils.ArchivingCount).Inc(1)
}()
reporter(jobKey, func(status *ArchiveJobDetail) {
status.RunningCutoff = cutoff
})
shard, err := m.GetTableShard(table, shardID)
if err != nil {
// table already deleted. stop here
utils.GetLogger().With("table", table, "shard", shardID, "error", err).Warn("Failed to find shard, is it deleted?")
return nil
}
defer shard.Users.Done()
if cutoff <= shard.ArchiveStore.CurrentVersion.ArchivingCutoff {
return utils.StackError(nil, "Cutoff %d is no greater than current cutoff %d",
cutoff, shard.ArchiveStore.CurrentVersion.ArchivingCutoff)
}
// Update the archiving cutoff time high water mark so ingestion won't update records below
// the new target archiving cutoff time.
shard.LiveStore.WriterLock.Lock()
shard.LiveStore.ArchivingCutoffHighWatermark = cutoff
shard.LiveStore.PrimaryKey.UpdateEventTimeCutoff(cutoff)
shard.LiveStore.WriterLock.Unlock()
utils.GetReporter(table, shardID).GetGauge(utils.ArchivingHighWatermark).Update(float64(cutoff))
// Create a new archive store version and switch to it.
patchByDay, oldVersion, unmanagedMemoryBytes, err := shard.createNewArchiveStoreVersion(cutoff, reporter, jobKey)
if err != nil {
if err == metaCom.ErrTableDoesNotExist {
utils.GetLogger().With("table", table, "shard", shardID).Warn("failed to create archive store version for non-exist table")
return nil
}
return err
}
// Wait for queries in other goroutines to prevent archiving from prematurely purging the old version.
oldVersion.Users.Wait()
// Purge redo log files on disk.
reporter(jobKey, func(status *ArchiveJobDetail) {
status.Stage = ArchivingPurge
})
backfillMgr := shard.LiveStore.BackfillManager
if err := shard.LiveStore.RedoLogManager.
CheckpointRedolog(cutoff, backfillMgr.LastRedoFile,
backfillMgr.LastBatchOffset); err != nil {
return err
}
// delete obsolete batch versions if there are no peer bootstraping job running
canDeleteData := false
if m.options.bootstrapToken.AcquireToken(table, uint32(shardID)) {
canDeleteData = true
defer m.options.bootstrapToken.ReleaseToken(table, uint32(shardID))
}
// Delete obsolete (merged base batch) vector parties in oldArchivedStore. We don't need any lock since all queries
// should already finish processing the old version.
for day := range patchByDay {
// We don't need to check existence again since it should be already created if missing in merge stage.
batch := oldVersion.Batches[day]
// Purge archive batch on disk.
if canDeleteData {
if err := m.diskStore.DeleteBatchVersions(table, shardID, int(day), batch.Version, batch.SeqNum); err != nil {
return err
}
}
// Purge archive batch in memory.
batch.SafeDestruct()
}
// Report memory usage.
newVersion := shard.ArchiveStore.GetCurrentVersion()
defer newVersion.Users.Done()
for day := range patchByDay {
batch := newVersion.Batches[day]
batch.RLock()
for columnID, column := range batch.Columns {
if column != nil {
// The new merged batch is no longer unmanaged memory.
bytes := column.GetBytes()
shard.HostMemoryManager.ReportManagedObject(shard.Schema.Schema.Name, shardID, int(day), columnID, bytes)
}
}
batch.RUnlock()
}
shard.HostMemoryManager.ReportUnmanagedSpaceUsageChange(-unmanagedMemoryBytes)
// Purge live store in memory.
batchIDsToPurge := shard.LiveStore.getBatchIDsToPurge(cutoff)
shard.LiveStore.PurgeBatches(batchIDsToPurge)
reporter(jobKey, func(status *ArchiveJobDetail) {
status.Stage = ArchivingComplete
status.LastCutoff = status.CurrentCutoff
status.CurrentCutoff = cutoff
})
utils.GetReporter(table, shardID).GetGauge(utils.ArchivingLowWatermark).Update(float64(cutoff))
return nil
}
func (shard *TableShard) createNewArchiveStoreVersion(cutoff uint32, reporter ArchiveJobDetailReporter, jobKey string) (
patchByDay map[int32]*archivingPatch, oldVersion *ArchiveStoreVersion, unmanagedMemoryBytes int64, err error) {
// Block column deletion
shard.columnDeletion.Lock()
defer shard.columnDeletion.Unlock()
tableName := shard.Schema.Schema.Name
shardID := shard.ShardID
// Snapshot schema
shard.Schema.RLock()
sortColumns := shard.Schema.Schema.ArchivingSortColumns
dataTypes := shard.Schema.ValueTypeByColumn
defaultValues := shard.Schema.DefaultValues
numColumns := len(dataTypes)
columnDeletions := shard.Schema.GetColumnDeletions()
shard.Schema.RUnlock()
oldVersion = shard.ArchiveStore.CurrentVersion
// Snapshot unsorted store structure, but not the data.
ss := shard.LiveStore.snapshot()
// Scan unsorted snapshot for stable records.
patchByDay = ss.createArchivingPatches(cutoff, oldVersion.ArchivingCutoff, sortColumns,
reporter, jobKey, tableName, shardID)
newVersion := NewArchiveStoreVersion(cutoff, shard)
// Begin of merge.
dayIdx := 1
numPatches := len(patchByDay)
reporter(jobKey, func(status *ArchiveJobDetail) {
status.Stage = ArchivingMerge
status.Current = 0
status.Total = numPatches
status.NumAffectedDays = numPatches
})
for day, patch := range patchByDay {
sort.Sort(patch)
batchID := int(day)
baseBatch := shard.ArchiveStore.CurrentVersion.RequestBatch(day)
var requestedVPs []memCom.ArchiveVectorParty
// We need to load all columns into memory for archiving.
for columnID := 0; columnID < numColumns; columnID++ {
requestedVP := baseBatch.RequestVectorParty(columnID)
requestedVP.WaitForDiskLoad()
requestedVPs = append(requestedVPs, requestedVP)
}
ctx := newMergeContext(baseBatch, patch, columnDeletions,
dataTypes, defaultValues, nil)
ctx.merge(cutoff, 0)
unmanagedMemoryBytes += ctx.unmanagedMemoryBytes
newVersion.Batches[day] = ctx.merged
// Unpin columns requested in this batch to unblock eviction.
UnpinVectorParties(requestedVPs)
if err = newVersion.Batches[day].WriteToDisk(); err != nil {
return
}
if err = shard.metaStore.AddArchiveBatchVersion(
shard.Schema.Schema.Name, shard.ShardID, batchID, cutoff, uint32(0),
newVersion.Batches[day].Size); err != nil {
return
}
reporter(jobKey, func(status *ArchiveJobDetail) {
status.Current = dayIdx
})
dayIdx++
}
oldVersion.RLock()
// Copy unmerged base batch into new version.
for day, batch := range oldVersion.Batches {
if _, ok := patchByDay[day]; !ok {
newVersion.Batches[day] = batch
}
}
oldVersion.RUnlock()
if err = shard.metaStore.UpdateArchivingCutoff(
shard.Schema.Schema.Name, shard.ShardID, cutoff); err != nil {
return
}
shard.ArchiveStore.Lock()
shard.ArchiveStore.CurrentVersion = newVersion
shard.ArchiveStore.Unlock()
return
}