func()

in memstore/purge.go [23:125]


func (m *memStoreImpl) Purge(tableName string, shardID, batchIDStart, batchIDEnd int, reporter PurgeJobDetailReporter) error {
	// check if there is peer bootstraping job running, skip this purge if we can not acquire the token
	if m.options.bootstrapToken.AcquireToken(tableName, uint32(shardID)) {
		defer m.options.bootstrapToken.ReleaseToken(tableName, uint32(shardID))
	} else {
		utils.GetLogger().With("table", tableName, "shard", shardID).Error("Purge failed, unable to acquire bootstrap token, retry later")
		return nil
	}

	start := utils.Now()
	jobKey := getIdentifier(tableName, shardID, memCom.PurgeJobType)
	purgeTimer := utils.GetReporter(tableName, shardID).GetTimer(utils.PurgeTimingTotal)
	defer func() {
		duration := utils.Now().Sub(start)
		purgeTimer.Record(duration)
		reporter(jobKey, func(status *PurgeJobDetail) {
			status.LastDuration = duration
		})
		utils.GetReporter(tableName, shardID).
			GetCounter(utils.PurgeCount).Inc(1)
	}()

	shard, err := m.GetTableShard(tableName, shardID)
	if err != nil {
		utils.GetLogger().With("table", tableName, "shard", shardID, "error", err).Warn("Failed to find shard, is it deleted?")
		return nil
	}
	defer shard.Users.Done()

	currentVersion := shard.ArchiveStore.GetCurrentVersion()
	defer currentVersion.Users.Done()

	reporter(jobKey, func(status *PurgeJobDetail) {
		status.Stage = PurgeMetaData
		status.BatchIDStart = batchIDStart
		status.BatchIDEnd = batchIDEnd
	})

	// detach batch from memory
	var batchesToPurge []*ArchiveBatch
	currentVersion.Lock()
	for batchID, batch := range currentVersion.Batches {
		if batchID >= int32(batchIDStart) && batchID < int32(batchIDEnd) {
			delete(currentVersion.Batches, batchID)
			if batch != nil {
				batchesToPurge = append(batchesToPurge, batch)
			}
		}
	}
	currentVersion.Unlock()

	// delete metadata of batches within range
	err = shard.metaStore.PurgeArchiveBatches(tableName, shardID, batchIDStart, batchIDEnd)
	if err != nil {
		return err
	}

	reporter(jobKey, func(status *PurgeJobDetail) {
		status.Stage = PurgeDataFile
	})

	// delete data file on disk of batches within range
	numBatches, err := shard.diskStore.DeleteBatches(tableName, shardID, batchIDStart, batchIDEnd)
	if err != nil {
		return err
	}
	reporter(jobKey, func(status *PurgeJobDetail) {
		status.NumBatches = numBatches
	})
	utils.GetReporter(tableName, shardID).GetCounter(utils.PurgedBatches).Inc(int64(numBatches))

	reporter(jobKey, func(status *PurgeJobDetail) {
		status.Stage = PurgeMemory
		status.Current = 0
		status.Total = len(batchesToPurge)
	})

	for id, batch := range batchesToPurge {
		batch.Lock()
		reporter(jobKey, func(status *PurgeJobDetail) {
			status.Current = id
		})
		for columnID, vp := range batch.Columns {
			if vp != nil {
				// wait for users to finish
				vp.(memCom.ArchiveVectorParty).WaitForUsers(true)
				vp.SafeDestruct()
				shard.HostMemoryManager.ReportManagedObject(tableName, shardID, int(batch.BatchID), columnID, 0)
			}
		}
		batch.Unlock()
	}

	reporter(jobKey, func(status *PurgeJobDetail) {
		status.Stage = PurgeComplete
	})

	shard.ArchiveStore.PurgeManager.Lock()
	shard.ArchiveStore.PurgeManager.LastPurgeTime = utils.Now()
	shard.ArchiveStore.PurgeManager.Unlock()

	return nil
}