func()

in memstore/backfill.go [108:266]


func (shard *TableShard) createNewArchiveStoreVersionForBackfill(
	backfillPatches map[int32]*backfillPatch, reporter BackfillJobDetailReporter, jobKey string) (err error) {
	// Block column deletion
	shard.columnDeletion.Lock()
	defer shard.columnDeletion.Unlock()

	// Snapshot schema
	shard.Schema.RLock()
	columnDeletions := shard.Schema.GetColumnDeletions()
	sortColumns := shard.Schema.Schema.ArchivingSortColumns
	primaryKeyColumns := shard.Schema.Schema.PrimaryKeyColumns
	dataTypes := shard.Schema.ValueTypeByColumn
	defaultValues := shard.Schema.DefaultValues
	numColumns := len(shard.Schema.ValueTypeByColumn)
	shard.Schema.RUnlock()

	var numAffectedDays int
	dayIdx := 1
	reporter(jobKey, func(status *BackfillJobDetail) {
		status.Stage = BackfillApplyPatch
		status.Current = 0
		status.Total = len(backfillPatches)
	})

	lockTimer := utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).
		GetTimer(utils.BackfillLockTiming)
	var totalLockDuration time.Duration
	defer func() {
		lockTimer.Record(totalLockDuration)
		reporter(jobKey, func(status *BackfillJobDetail) {
			status.LockDuration = totalLockDuration
		})
	}()

	// Only those batches that are affected and changed need to be cleaned.
	for day, patch := range backfillPatches {
		baseBatch := shard.ArchiveStore.CurrentVersion.RequestBatch(day)

		var requestedVPs []memCom.ArchiveVectorParty
		for columnID := 0; columnID < numColumns; columnID++ {
			requestedVP := baseBatch.RequestVectorParty(columnID)
			requestedVP.WaitForDiskLoad()
			requestedVPs = append(requestedVPs, requestedVP)
		}

		backfillCtx := newBackfillContext(baseBatch, patch, shard.Schema, columnDeletions, sortColumns,
			primaryKeyColumns, dataTypes, defaultValues, shard.HostMemoryManager)

		// Real backfill implementation.
		if err = backfillCtx.backfill(reporter, jobKey); err != nil {
			UnpinVectorParties(requestedVPs)
			backfillCtx.release()
			return
		}

		if backfillCtx.okForEarlyUnpin {
			UnpinVectorParties(requestedVPs)
		}
		backfillCtx.release()

		oldVersion := shard.ArchiveStore.CurrentVersion
		newVersion := NewArchiveStoreVersion(oldVersion.ArchivingCutoff, shard)

		var affected bool
		var purgeOldBatch bool
		if len(backfillCtx.columnsToPurge) == 0 {
			// Batch is clean, we can copy the old batch to new version directly.
			newVersion.Batches[day] = backfillCtx.base
			// clean pointer in cloned batch.
			backfillCtx.new.Columns = nil
		} else {
			affected = true
			purgeOldBatch = true
			numAffectedDays++
			newVersion.Batches[day] = backfillCtx.new
			if err = newVersion.Batches[day].WriteToDisk(); err != nil {
				return
			}
			if err = shard.metaStore.AddArchiveBatchVersion(
				shard.Schema.Schema.Name, shard.ShardID, int(day), newVersion.Batches[day].Version,
				newVersion.Batches[day].SeqNum, newVersion.Batches[day].Size); err != nil {
				return
			}
		}

		lockStart := utils.Now()
		// Copy other batches in old version to new version.
		oldVersion.RLock()
		for oldDay, oldBatch := range oldVersion.Batches {
			if oldDay != day {
				newVersion.Batches[oldDay] = oldBatch
			}
		}
		oldVersion.RUnlock()

		// switch to new version
		shard.ArchiveStore.Lock()
		shard.ArchiveStore.CurrentVersion = newVersion
		shard.ArchiveStore.Unlock()

		if !backfillCtx.okForEarlyUnpin {
			UnpinVectorParties(requestedVPs)
		}

		reporter(jobKey, func(status *BackfillJobDetail) {
			status.NumAffectedDays = numAffectedDays
			utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetGauge(utils.BackfillAffectedDays).
				Update(float64(status.NumAffectedDays))
		})

		oldVersion.Users.Wait()
		totalLockDuration += utils.Now().Sub(lockStart)

		oldBatch := backfillCtx.base
		// Purge batches on disk.
		if purgeOldBatch {
			if shard.options.bootstrapToken.AcquireToken(shard.Schema.Schema.Name, uint32(shard.ShardID)) {
				err = shard.diskStore.DeleteBatchVersions(shard.Schema.Schema.Name, shard.ShardID,
					int(oldBatch.BatchID), oldBatch.Version, oldBatch.SeqNum)
				shard.options.bootstrapToken.ReleaseToken(shard.Schema.Schema.Name, uint32(shard.ShardID))
				if err != nil {
					return
				}
			}
		}

		// Purge columns in memory.
		for _, column := range backfillCtx.columnsToPurge {
			column.SafeDestruct()
		}

		// Report memory usage.
		newVersion.Users.Add(1)
		if affected {
			newVersion.RLock()
			batch := newVersion.Batches[day]
			newVersion.RUnlock()
			batch.RLock()
			for columnID, column := range batch.Columns {
				// Do the nil check in case column is evicted.
				if column != nil {
					bytes := column.GetBytes()
					shard.HostMemoryManager.ReportManagedObject(
						shard.Schema.Schema.Name, shard.ShardID, int(day), columnID, bytes)
				}
			}
			batch.RUnlock()
		}
		newVersion.Users.Done()
		shard.HostMemoryManager.ReportUnmanagedSpaceUsageChange(-backfillCtx.unmanagedMemoryBytes)

		reporter(jobKey, func(status *BackfillJobDetail) {
			status.Current = dayIdx
		})
		dayIdx++
	}

	return
}