in memstore/purge.go [23:125]
func (m *memStoreImpl) Purge(tableName string, shardID, batchIDStart, batchIDEnd int, reporter PurgeJobDetailReporter) error {
// check if there is peer bootstraping job running, skip this purge if we can not acquire the token
if m.options.bootstrapToken.AcquireToken(tableName, uint32(shardID)) {
defer m.options.bootstrapToken.ReleaseToken(tableName, uint32(shardID))
} else {
utils.GetLogger().With("table", tableName, "shard", shardID).Error("Purge failed, unable to acquire bootstrap token, retry later")
return nil
}
start := utils.Now()
jobKey := getIdentifier(tableName, shardID, memCom.PurgeJobType)
purgeTimer := utils.GetReporter(tableName, shardID).GetTimer(utils.PurgeTimingTotal)
defer func() {
duration := utils.Now().Sub(start)
purgeTimer.Record(duration)
reporter(jobKey, func(status *PurgeJobDetail) {
status.LastDuration = duration
})
utils.GetReporter(tableName, shardID).
GetCounter(utils.PurgeCount).Inc(1)
}()
shard, err := m.GetTableShard(tableName, shardID)
if err != nil {
utils.GetLogger().With("table", tableName, "shard", shardID, "error", err).Warn("Failed to find shard, is it deleted?")
return nil
}
defer shard.Users.Done()
currentVersion := shard.ArchiveStore.GetCurrentVersion()
defer currentVersion.Users.Done()
reporter(jobKey, func(status *PurgeJobDetail) {
status.Stage = PurgeMetaData
status.BatchIDStart = batchIDStart
status.BatchIDEnd = batchIDEnd
})
// detach batch from memory
var batchesToPurge []*ArchiveBatch
currentVersion.Lock()
for batchID, batch := range currentVersion.Batches {
if batchID >= int32(batchIDStart) && batchID < int32(batchIDEnd) {
delete(currentVersion.Batches, batchID)
if batch != nil {
batchesToPurge = append(batchesToPurge, batch)
}
}
}
currentVersion.Unlock()
// delete metadata of batches within range
err = shard.metaStore.PurgeArchiveBatches(tableName, shardID, batchIDStart, batchIDEnd)
if err != nil {
return err
}
reporter(jobKey, func(status *PurgeJobDetail) {
status.Stage = PurgeDataFile
})
// delete data file on disk of batches within range
numBatches, err := shard.diskStore.DeleteBatches(tableName, shardID, batchIDStart, batchIDEnd)
if err != nil {
return err
}
reporter(jobKey, func(status *PurgeJobDetail) {
status.NumBatches = numBatches
})
utils.GetReporter(tableName, shardID).GetCounter(utils.PurgedBatches).Inc(int64(numBatches))
reporter(jobKey, func(status *PurgeJobDetail) {
status.Stage = PurgeMemory
status.Current = 0
status.Total = len(batchesToPurge)
})
for id, batch := range batchesToPurge {
batch.Lock()
reporter(jobKey, func(status *PurgeJobDetail) {
status.Current = id
})
for columnID, vp := range batch.Columns {
if vp != nil {
// wait for users to finish
vp.(memCom.ArchiveVectorParty).WaitForUsers(true)
vp.SafeDestruct()
shard.HostMemoryManager.ReportManagedObject(tableName, shardID, int(batch.BatchID), columnID, 0)
}
}
batch.Unlock()
}
reporter(jobKey, func(status *PurgeJobDetail) {
status.Stage = PurgeComplete
})
shard.ArchiveStore.PurgeManager.Lock()
shard.ArchiveStore.PurgeManager.LastPurgeTime = utils.Now()
shard.ArchiveStore.PurgeManager.Unlock()
return nil
}