in memstore/archiving.go [251:367]
func (m *memStoreImpl) Archive(table string, shardID int, cutoff uint32, reporter ArchiveJobDetailReporter) error {
archivingTimer := utils.GetReporter(table, shardID).GetTimer(utils.ArchivingTimingTotal)
start := utils.Now()
jobKey := getIdentifier(table, shardID, memCom.ArchivingJobType)
// Emit duration metrics and report back to scheduler.
defer func() {
duration := utils.Now().Sub(start)
archivingTimer.Record(duration)
reporter(jobKey, func(status *ArchiveJobDetail) {
status.LastDuration = duration
})
utils.GetReporter(table, shardID).
GetCounter(utils.ArchivingCount).Inc(1)
}()
reporter(jobKey, func(status *ArchiveJobDetail) {
status.RunningCutoff = cutoff
})
shard, err := m.GetTableShard(table, shardID)
if err != nil {
// table already deleted. stop here
utils.GetLogger().With("table", table, "shard", shardID, "error", err).Warn("Failed to find shard, is it deleted?")
return nil
}
defer shard.Users.Done()
if cutoff <= shard.ArchiveStore.CurrentVersion.ArchivingCutoff {
return utils.StackError(nil, "Cutoff %d is no greater than current cutoff %d",
cutoff, shard.ArchiveStore.CurrentVersion.ArchivingCutoff)
}
// Update the archiving cutoff time high water mark so ingestion won't update records below
// the new target archiving cutoff time.
shard.LiveStore.WriterLock.Lock()
shard.LiveStore.ArchivingCutoffHighWatermark = cutoff
shard.LiveStore.PrimaryKey.UpdateEventTimeCutoff(cutoff)
shard.LiveStore.WriterLock.Unlock()
utils.GetReporter(table, shardID).GetGauge(utils.ArchivingHighWatermark).Update(float64(cutoff))
// Create a new archive store version and switch to it.
patchByDay, oldVersion, unmanagedMemoryBytes, err := shard.createNewArchiveStoreVersion(cutoff, reporter, jobKey)
if err != nil {
if err == metaCom.ErrTableDoesNotExist {
utils.GetLogger().With("table", table, "shard", shardID).Warn("failed to create archive store version for non-exist table")
return nil
}
return err
}
// Wait for queries in other goroutines to prevent archiving from prematurely purging the old version.
oldVersion.Users.Wait()
// Purge redo log files on disk.
reporter(jobKey, func(status *ArchiveJobDetail) {
status.Stage = ArchivingPurge
})
backfillMgr := shard.LiveStore.BackfillManager
if err := shard.LiveStore.RedoLogManager.
CheckpointRedolog(cutoff, backfillMgr.LastRedoFile,
backfillMgr.LastBatchOffset); err != nil {
return err
}
// delete obsolete batch versions if there are no peer bootstraping job running
canDeleteData := false
if m.options.bootstrapToken.AcquireToken(table, uint32(shardID)) {
canDeleteData = true
defer m.options.bootstrapToken.ReleaseToken(table, uint32(shardID))
}
// Delete obsolete (merged base batch) vector parties in oldArchivedStore. We don't need any lock since all queries
// should already finish processing the old version.
for day := range patchByDay {
// We don't need to check existence again since it should be already created if missing in merge stage.
batch := oldVersion.Batches[day]
// Purge archive batch on disk.
if canDeleteData {
if err := m.diskStore.DeleteBatchVersions(table, shardID, int(day), batch.Version, batch.SeqNum); err != nil {
return err
}
}
// Purge archive batch in memory.
batch.SafeDestruct()
}
// Report memory usage.
newVersion := shard.ArchiveStore.GetCurrentVersion()
defer newVersion.Users.Done()
for day := range patchByDay {
batch := newVersion.Batches[day]
batch.RLock()
for columnID, column := range batch.Columns {
if column != nil {
// The new merged batch is no longer unmanaged memory.
bytes := column.GetBytes()
shard.HostMemoryManager.ReportManagedObject(shard.Schema.Schema.Name, shardID, int(day), columnID, bytes)
}
}
batch.RUnlock()
}
shard.HostMemoryManager.ReportUnmanagedSpaceUsageChange(-unmanagedMemoryBytes)
// Purge live store in memory.
batchIDsToPurge := shard.LiveStore.getBatchIDsToPurge(cutoff)
shard.LiveStore.PurgeBatches(batchIDsToPurge)
reporter(jobKey, func(status *ArchiveJobDetail) {
status.Stage = ArchivingComplete
status.LastCutoff = status.CurrentCutoff
status.CurrentCutoff = cutoff
})
utils.GetReporter(table, shardID).GetGauge(utils.ArchivingLowWatermark).Update(float64(cutoff))
return nil
}