in memstore/backfill.go [108:266]
func (shard *TableShard) createNewArchiveStoreVersionForBackfill(
backfillPatches map[int32]*backfillPatch, reporter BackfillJobDetailReporter, jobKey string) (err error) {
// Block column deletion
shard.columnDeletion.Lock()
defer shard.columnDeletion.Unlock()
// Snapshot schema
shard.Schema.RLock()
columnDeletions := shard.Schema.GetColumnDeletions()
sortColumns := shard.Schema.Schema.ArchivingSortColumns
primaryKeyColumns := shard.Schema.Schema.PrimaryKeyColumns
dataTypes := shard.Schema.ValueTypeByColumn
defaultValues := shard.Schema.DefaultValues
numColumns := len(shard.Schema.ValueTypeByColumn)
shard.Schema.RUnlock()
var numAffectedDays int
dayIdx := 1
reporter(jobKey, func(status *BackfillJobDetail) {
status.Stage = BackfillApplyPatch
status.Current = 0
status.Total = len(backfillPatches)
})
lockTimer := utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).
GetTimer(utils.BackfillLockTiming)
var totalLockDuration time.Duration
defer func() {
lockTimer.Record(totalLockDuration)
reporter(jobKey, func(status *BackfillJobDetail) {
status.LockDuration = totalLockDuration
})
}()
// Only those batches that are affected and changed need to be cleaned.
for day, patch := range backfillPatches {
baseBatch := shard.ArchiveStore.CurrentVersion.RequestBatch(day)
var requestedVPs []memCom.ArchiveVectorParty
for columnID := 0; columnID < numColumns; columnID++ {
requestedVP := baseBatch.RequestVectorParty(columnID)
requestedVP.WaitForDiskLoad()
requestedVPs = append(requestedVPs, requestedVP)
}
backfillCtx := newBackfillContext(baseBatch, patch, shard.Schema, columnDeletions, sortColumns,
primaryKeyColumns, dataTypes, defaultValues, shard.HostMemoryManager)
// Real backfill implementation.
if err = backfillCtx.backfill(reporter, jobKey); err != nil {
UnpinVectorParties(requestedVPs)
backfillCtx.release()
return
}
if backfillCtx.okForEarlyUnpin {
UnpinVectorParties(requestedVPs)
}
backfillCtx.release()
oldVersion := shard.ArchiveStore.CurrentVersion
newVersion := NewArchiveStoreVersion(oldVersion.ArchivingCutoff, shard)
var affected bool
var purgeOldBatch bool
if len(backfillCtx.columnsToPurge) == 0 {
// Batch is clean, we can copy the old batch to new version directly.
newVersion.Batches[day] = backfillCtx.base
// clean pointer in cloned batch.
backfillCtx.new.Columns = nil
} else {
affected = true
purgeOldBatch = true
numAffectedDays++
newVersion.Batches[day] = backfillCtx.new
if err = newVersion.Batches[day].WriteToDisk(); err != nil {
return
}
if err = shard.metaStore.AddArchiveBatchVersion(
shard.Schema.Schema.Name, shard.ShardID, int(day), newVersion.Batches[day].Version,
newVersion.Batches[day].SeqNum, newVersion.Batches[day].Size); err != nil {
return
}
}
lockStart := utils.Now()
// Copy other batches in old version to new version.
oldVersion.RLock()
for oldDay, oldBatch := range oldVersion.Batches {
if oldDay != day {
newVersion.Batches[oldDay] = oldBatch
}
}
oldVersion.RUnlock()
// switch to new version
shard.ArchiveStore.Lock()
shard.ArchiveStore.CurrentVersion = newVersion
shard.ArchiveStore.Unlock()
if !backfillCtx.okForEarlyUnpin {
UnpinVectorParties(requestedVPs)
}
reporter(jobKey, func(status *BackfillJobDetail) {
status.NumAffectedDays = numAffectedDays
utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetGauge(utils.BackfillAffectedDays).
Update(float64(status.NumAffectedDays))
})
oldVersion.Users.Wait()
totalLockDuration += utils.Now().Sub(lockStart)
oldBatch := backfillCtx.base
// Purge batches on disk.
if purgeOldBatch {
if shard.options.bootstrapToken.AcquireToken(shard.Schema.Schema.Name, uint32(shard.ShardID)) {
err = shard.diskStore.DeleteBatchVersions(shard.Schema.Schema.Name, shard.ShardID,
int(oldBatch.BatchID), oldBatch.Version, oldBatch.SeqNum)
shard.options.bootstrapToken.ReleaseToken(shard.Schema.Schema.Name, uint32(shard.ShardID))
if err != nil {
return
}
}
}
// Purge columns in memory.
for _, column := range backfillCtx.columnsToPurge {
column.SafeDestruct()
}
// Report memory usage.
newVersion.Users.Add(1)
if affected {
newVersion.RLock()
batch := newVersion.Batches[day]
newVersion.RUnlock()
batch.RLock()
for columnID, column := range batch.Columns {
// Do the nil check in case column is evicted.
if column != nil {
bytes := column.GetBytes()
shard.HostMemoryManager.ReportManagedObject(
shard.Schema.Schema.Name, shard.ShardID, int(day), columnID, bytes)
}
}
batch.RUnlock()
}
newVersion.Users.Done()
shard.HostMemoryManager.ReportUnmanagedSpaceUsageChange(-backfillCtx.unmanagedMemoryBytes)
reporter(jobKey, func(status *BackfillJobDetail) {
status.Current = dayIdx
})
dayIdx++
}
return
}