func()

in memstore/archiving.go [251:367]


func (m *memStoreImpl) Archive(table string, shardID int, cutoff uint32, reporter ArchiveJobDetailReporter) error {
	archivingTimer := utils.GetReporter(table, shardID).GetTimer(utils.ArchivingTimingTotal)
	start := utils.Now()
	jobKey := getIdentifier(table, shardID, memCom.ArchivingJobType)
	// Emit duration metrics and report back to scheduler.
	defer func() {
		duration := utils.Now().Sub(start)
		archivingTimer.Record(duration)
		reporter(jobKey, func(status *ArchiveJobDetail) {
			status.LastDuration = duration
		})
		utils.GetReporter(table, shardID).
			GetCounter(utils.ArchivingCount).Inc(1)
	}()

	reporter(jobKey, func(status *ArchiveJobDetail) {
		status.RunningCutoff = cutoff
	})

	shard, err := m.GetTableShard(table, shardID)
	if err != nil {
		// table already deleted. stop here
		utils.GetLogger().With("table", table, "shard", shardID, "error", err).Warn("Failed to find shard, is it deleted?")
		return nil
	}
	defer shard.Users.Done()

	if cutoff <= shard.ArchiveStore.CurrentVersion.ArchivingCutoff {
		return utils.StackError(nil, "Cutoff %d is no greater than current cutoff %d",
			cutoff, shard.ArchiveStore.CurrentVersion.ArchivingCutoff)
	}

	// Update the archiving cutoff time high water mark so ingestion won't update records below
	// the new target archiving cutoff time.
	shard.LiveStore.WriterLock.Lock()
	shard.LiveStore.ArchivingCutoffHighWatermark = cutoff
	shard.LiveStore.PrimaryKey.UpdateEventTimeCutoff(cutoff)
	shard.LiveStore.WriterLock.Unlock()

	utils.GetReporter(table, shardID).GetGauge(utils.ArchivingHighWatermark).Update(float64(cutoff))

	// Create a new archive store version and switch to it.
	patchByDay, oldVersion, unmanagedMemoryBytes, err := shard.createNewArchiveStoreVersion(cutoff, reporter, jobKey)
	if err != nil {
		if err == metaCom.ErrTableDoesNotExist {
			utils.GetLogger().With("table", table, "shard", shardID).Warn("failed to create archive store version for non-exist table")
			return nil
		}
		return err
	}

	// Wait for queries in other goroutines to prevent archiving from prematurely purging the old version.
	oldVersion.Users.Wait()

	// Purge redo log files on disk.
	reporter(jobKey, func(status *ArchiveJobDetail) {
		status.Stage = ArchivingPurge
	})

	backfillMgr := shard.LiveStore.BackfillManager
	if err := shard.LiveStore.RedoLogManager.
		CheckpointRedolog(cutoff, backfillMgr.LastRedoFile,
			backfillMgr.LastBatchOffset); err != nil {
		return err
	}

	// delete obsolete batch versions if there are no peer bootstraping job running
	canDeleteData := false
	if m.options.bootstrapToken.AcquireToken(table, uint32(shardID)) {
		canDeleteData = true
		defer m.options.bootstrapToken.ReleaseToken(table, uint32(shardID))
	}
	// Delete obsolete (merged base batch) vector parties in oldArchivedStore. We don't need any lock since all queries
	// should already finish processing the old version.
	for day := range patchByDay {
		// We don't need to check existence again since it should be already created if missing in merge stage.
		batch := oldVersion.Batches[day]
		// Purge archive batch on disk.
		if canDeleteData {
			if err := m.diskStore.DeleteBatchVersions(table, shardID, int(day), batch.Version, batch.SeqNum); err != nil {
				return err
			}
		}
		// Purge archive batch in memory.
		batch.SafeDestruct()
	}

	// Report memory usage.
	newVersion := shard.ArchiveStore.GetCurrentVersion()
	defer newVersion.Users.Done()

	for day := range patchByDay {
		batch := newVersion.Batches[day]
		batch.RLock()
		for columnID, column := range batch.Columns {
			if column != nil {
				// The new merged batch is no longer unmanaged memory.
				bytes := column.GetBytes()
				shard.HostMemoryManager.ReportManagedObject(shard.Schema.Schema.Name, shardID, int(day), columnID, bytes)
			}
		}
		batch.RUnlock()
	}
	shard.HostMemoryManager.ReportUnmanagedSpaceUsageChange(-unmanagedMemoryBytes)

	// Purge live store in memory.
	batchIDsToPurge := shard.LiveStore.getBatchIDsToPurge(cutoff)
	shard.LiveStore.PurgeBatches(batchIDsToPurge)

	reporter(jobKey, func(status *ArchiveJobDetail) {
		status.Stage = ArchivingComplete
		status.LastCutoff = status.CurrentCutoff
		status.CurrentCutoff = cutoff
	})
	utils.GetReporter(table, shardID).GetGauge(utils.ArchivingLowWatermark).Update(float64(cutoff))
	return nil
}