internal/backup/log_entry.go (441 lines of code) (raw):
package backup
import (
"container/list"
"context"
"fmt"
"io"
"path/filepath"
"strings"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v16/internal/archive"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
logging "gitlab.com/gitlab-org/gitaly/v16/internal/log"
)
const (
// minRetryWait is the shortest duration to backoff before retrying, and is also the initial backoff duration.
minRetryWait = 5 * time.Second
// maxRetryWait is the longest duration to backoff before retrying.
maxRetryWait = 5 * time.Minute
)
// logEntry is used to track the state of a backup request.
type logEntry struct {
partitionInfo PartitionInfo
lsn storage.LSN
success bool
}
// newLogEntry constructs a new logEntry.
func newLogEntry(partitionInfo PartitionInfo, lsn storage.LSN) *logEntry {
return &logEntry{
partitionInfo: partitionInfo,
lsn: lsn,
}
}
// partitionNotification is used to store the data received by NotifyNewEntries.
type partitionNotification struct {
lowWaterMark storage.LSN
highWaterMark storage.LSN
partitionInfo PartitionInfo
}
// newPartitionNotification constructs a new partitionNotification.
func newPartitionNotification(storageName string, partitionID storage.PartitionID, lowWaterMark, highWaterMark storage.LSN) *partitionNotification {
return &partitionNotification{
partitionInfo: PartitionInfo{
StorageName: storageName,
PartitionID: partitionID,
},
lowWaterMark: lowWaterMark,
highWaterMark: highWaterMark,
}
}
// partitionState tracks the progress made on one partition.
type partitionState struct {
// nextLSN is the next LSN to be backed up.
nextLSN storage.LSN
// highWaterMark is the highest LSN to be backed up.
highWaterMark storage.LSN
// hasJob indicates if a backup job is currently being processed for this partition.
hasJob bool
}
// newPartitionState constructs a new partitionState.
func newPartitionState(nextLSN, highWaterMark storage.LSN) *partitionState {
return &partitionState{
nextLSN: nextLSN,
highWaterMark: highWaterMark,
}
}
// PartitionInfo is the global identifier for a partition.
type PartitionInfo struct {
StorageName string
PartitionID storage.PartitionID
}
// LogEntryArchiver is used to backup applied log entries. It has a configurable number of
// worker goroutines that will perform backups. Each partition may only have one backup
// executing at a time, entries are always processed in-order. Backup failures will trigger
// an exponential backoff.
type LogEntryArchiver struct {
// logger is the logger to use to write log messages.
logger logging.Logger
// store is where the log archives are kept.
store LogEntryStore
// node is used to access the LogManagers.
node *storage.Node
// notificationCh is the channel used to signal that a new notification has arrived.
notificationCh chan struct{}
// workCh is the channel used to signal that the archiver should try to process more jobs.
workCh chan struct{}
// closingCh is the channel used to signal that the LogEntryArchiver should exit.
closingCh chan struct{}
// closedCh is the channel used to wait for the archiver to completely stop.
closedCh chan struct{}
// notifications is the list of log notifications to ingest. notificationsMutex must be held when accessing it.
notifications *list.List
// notificationsMutex is used to synchronize access to notifications.
notificationsMutex sync.Mutex
// partitionStates tracks the current LSN and entry backlog of each partition in a storage.
partitionStates map[PartitionInfo]*partitionState
// activePartitions tracks with partitions need to be processed.
activePartitions map[PartitionInfo]struct{}
// activeJobs tracks how many entries are currently being backed up.
activeJobs uint
// workerCount sets the number of goroutines used to perform backups.
workerCount uint
// waitDur controls how long to wait before retrying when a backup attempt fails.
waitDur time.Duration
// tickerFunc allows the archiver to wait with an exponential backoff between retries.
tickerFunc func(time.Duration) helper.Ticker
// backupCounter provides metrics with a count of the number of WAL entries backed up by status.
backupCounter *prometheus.CounterVec
// backupLatency provides metrics on the latency of WAL backup operations.
backupLatency prometheus.Histogram
}
// NewLogEntryArchiver constructs a new LogEntryArchiver.
func NewLogEntryArchiver(logger logging.Logger, archiveSink *Sink, workerCount uint, node *storage.Node) *LogEntryArchiver {
return newLogEntryArchiver(logger, archiveSink, workerCount, node, helper.NewTimerTicker)
}
// newLogEntryArchiver constructs a new LogEntryArchiver with a configurable ticker function.
func newLogEntryArchiver(logger logging.Logger, archiveSink *Sink, workerCount uint, node *storage.Node, tickerFunc func(time.Duration) helper.Ticker) *LogEntryArchiver {
if workerCount < 1 {
workerCount = 1
}
archiver := &LogEntryArchiver{
logger: logger,
store: NewLogEntryStore(archiveSink),
node: node,
notificationCh: make(chan struct{}, 1),
workCh: make(chan struct{}, 1),
closingCh: make(chan struct{}),
closedCh: make(chan struct{}),
notifications: list.New(),
partitionStates: make(map[PartitionInfo]*partitionState),
activePartitions: make(map[PartitionInfo]struct{}),
workerCount: workerCount,
tickerFunc: tickerFunc,
waitDur: minRetryWait,
backupCounter: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gitaly_wal_backup_count",
Help: "Counter of the number of WAL entries backed up by status",
},
[]string{"status"},
),
backupLatency: prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "gitaly_wal_backup_latency_seconds",
Help: "Latency of WAL entry backups",
},
),
}
return archiver
}
// NotifyNewEntries passes the log entry information to the LogEntryArchiver for processing.
func (la *LogEntryArchiver) NotifyNewEntries(storageName string, partitionID storage.PartitionID, lowWaterMark, highWaterMark storage.LSN) {
la.notificationsMutex.Lock()
defer la.notificationsMutex.Unlock()
la.notifications.PushBack(newPartitionNotification(storageName, partitionID, lowWaterMark, highWaterMark))
select {
case la.notificationCh <- struct{}{}:
// Archiver has a pending notification already, no further action needed.
default:
}
}
// Run starts log entry archiving.
func (la *LogEntryArchiver) Run() {
go func() {
la.logger.Info("log entry archiver: started")
defer func() {
la.notificationsMutex.Lock()
defer la.notificationsMutex.Unlock()
la.logger.WithField("pending_entries", la.notifications.Len()).Info("log entry archiver: stopped")
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sendCh := make(chan *logEntry)
recvCh := make(chan *logEntry)
var wg sync.WaitGroup
for i := uint(0); i < la.workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
la.processEntries(ctx, sendCh, recvCh)
}()
}
la.main(ctx, sendCh, recvCh)
close(sendCh)
// Interrupt any running backups so we can exit quickly.
cancel()
// Wait for all workers to exit.
wg.Wait()
}()
}
// Close stops the LogEntryArchiver, causing Run to return.
func (la *LogEntryArchiver) Close() {
close(la.closingCh)
<-la.closedCh
}
// main is the main loop of the LogEntryArchiver. New notifications are ingested, jobs
// are sent to workers, and the result of jobs are received.
func (la *LogEntryArchiver) main(ctx context.Context, sendCh, recvCh chan *logEntry) {
defer close(la.closedCh)
for {
// Triggering sendEntries via workCh may not process all entries if there
// are more active partitions than workers or more than one entry to process
// in a partition. We will need to call it repeatedly to work through the
// backlog. If there are no available jobs or workers sendEntries is a no-op.
la.sendEntries(sendCh)
select {
case <-la.workCh:
la.sendEntries(sendCh)
case <-la.notificationCh:
la.ingestNotifications(ctx)
case entry := <-recvCh:
la.receiveEntry(ctx, entry)
case <-la.closingCh:
return
}
}
}
// sendEntries sends available log entries to worker goroutines for processing.
// It may consume up to as many entries as there are available workers.
func (la *LogEntryArchiver) sendEntries(sendCh chan *logEntry) {
// We use a map to randomize partition processing order. Map access is not
// truly random or completely fair, but it's close enough for our purposes.
for partitionInfo := range la.activePartitions {
// All workers are busy, go back to waiting.
if la.activeJobs == la.workerCount {
return
}
state := la.partitionStates[partitionInfo]
if state.hasJob {
continue
}
state.hasJob = true
sendCh <- newLogEntry(partitionInfo, state.nextLSN)
la.activeJobs++
}
}
// ingestNotifications read all new notifications and updates partition states.
func (la *LogEntryArchiver) ingestNotifications(ctx context.Context) {
for {
notification := la.popNextNotification()
if notification == nil {
return
}
state, ok := la.partitionStates[notification.partitionInfo]
if !ok {
state = newPartitionState(notification.lowWaterMark, notification.highWaterMark)
la.partitionStates[notification.partitionInfo] = state
}
// We have already backed up all entries sent by the LogManager, but the manager is
// not aware of this. Acknowledge again with our last processed entry.
if state.nextLSN > notification.highWaterMark {
if err := la.callLogReader(ctx, notification.partitionInfo, func(lm storage.LogReader) error {
return lm.AcknowledgePosition(log.ConsumerPosition, state.nextLSN-1)
}); err != nil {
la.logger.WithError(err).Error("log entry archiver: failed to get LogManager for already completed entry")
}
continue
}
// We expect our next LSN to be at or above the oldest LSN available for backup. If not,
// we will be unable to backup the full sequence.
if state.nextLSN < notification.lowWaterMark {
la.logger.WithFields(
logging.Fields{
"storage": notification.partitionInfo.StorageName,
"partition_id": notification.partitionInfo.PartitionID,
"expected_lsn": state.nextLSN,
"actual_lsn": notification.lowWaterMark,
}).Error("log entry archiver: gap in log sequence")
// The LogManager reports that it no longer has our expected
// LSN available for consumption. Skip ahead to the oldest entry
// still present.
state.nextLSN = notification.lowWaterMark
}
state.highWaterMark = notification.highWaterMark
// Mark partition as active.
la.activePartitions[notification.partitionInfo] = struct{}{}
la.notifyNewEntries()
}
}
func (la *LogEntryArchiver) callLogReader(ctx context.Context, partitionInfo PartitionInfo, callback func(lm storage.LogReader) error) error {
storageHandle, err := (*la.node).GetStorage(partitionInfo.StorageName)
if err != nil {
return fmt.Errorf("get storage: %w", err)
}
partition, err := storageHandle.GetPartition(ctx, partitionInfo.PartitionID)
if err != nil {
return fmt.Errorf("get partition: %w", err)
}
defer partition.Close()
if err := callback(partition.GetLogReader()); err != nil {
return fmt.Errorf("acknowledge consumer position: %w", err)
}
return nil
}
// receiveEntry handles the result of a backup job. If the backup failed, then it
// will block for la.waitDur to allow the conditions that caused the failure to resolve
// themselves. Continued failure results in an exponential backoff.
func (la *LogEntryArchiver) receiveEntry(ctx context.Context, entry *logEntry) {
la.activeJobs--
state := la.partitionStates[entry.partitionInfo]
state.hasJob = false
if !entry.success {
// It is likely that a problem with one backup will impact others, e.g.
// connectivity issues with object storage. Wait to avoid a thundering
// herd of retries.
la.backoff()
return
}
state.nextLSN++
// All entries in partition have been backed up, the partition is dormant.
if state.nextLSN > state.highWaterMark {
delete(la.activePartitions, entry.partitionInfo)
}
// Decrease backoff on success.
la.waitDur /= 2
if la.waitDur < minRetryWait {
la.waitDur = minRetryWait
}
if err := la.callLogReader(ctx, entry.partitionInfo, func(lm storage.LogReader) error {
return lm.AcknowledgePosition(log.ConsumerPosition, entry.lsn)
}); err != nil {
la.logger.WithError(err).WithFields(
logging.Fields{
"storage": entry.partitionInfo.StorageName,
"partition_id": entry.partitionInfo.PartitionID,
"lsn": entry.lsn,
}).Error("log entry archiver: failed to get LogManager for newly completed entry")
}
}
// processEntries is executed by worker goroutines. This performs the actual backups.
func (la *LogEntryArchiver) processEntries(ctx context.Context, inCh, outCh chan *logEntry) {
for entry := range inCh {
la.processEntry(ctx, entry)
outCh <- entry
}
}
// processEntry checks if an existing backup exists, and performs a backup if not present.
func (la *LogEntryArchiver) processEntry(ctx context.Context, entry *logEntry) {
logger := la.logger.WithFields(logging.Fields{
"storage": entry.partitionInfo.StorageName,
"partition_id": entry.partitionInfo.PartitionID,
"lsn": entry.lsn,
})
var entryPath string
if err := la.callLogReader(context.Background(), entry.partitionInfo, func(lm storage.LogReader) error {
entryPath = lm.GetEntryPath(entry.lsn)
return nil
}); err != nil {
la.backupCounter.WithLabelValues("fail").Add(1)
la.logger.WithError(err).Error("log entry archiver: failed to get LogManager for entry path")
return
}
backupExists, err := la.store.Exists(ctx, entry.partitionInfo, entry.lsn)
if err != nil {
la.backupCounter.WithLabelValues("fail").Add(1)
logger.WithError(err).Error("log entry archiver: checking for existing log entry backup")
return
}
if backupExists {
// Don't increment backupCounter, we didn't perform a backup.
entry.success = true
return
}
if err := la.backupLogEntry(ctx, entry.partitionInfo, entry.lsn, entryPath); err != nil {
la.backupCounter.WithLabelValues("fail").Add(1)
logger.WithError(err).Error("log entry archiver: failed to backup log entry")
return
}
la.backupCounter.WithLabelValues("success").Add(1)
entry.success = true
}
// backupLogEntry tar's the root directory of the log entry and writes it to the Sink.
func (la *LogEntryArchiver) backupLogEntry(ctx context.Context, partitionInfo PartitionInfo, lsn storage.LSN, entryPath string) (returnErr error) {
timer := prometheus.NewTimer(la.backupLatency)
defer timer.ObserveDuration()
// Create a new context to abort the write on failure.
writeCtx, cancelWrite := context.WithCancel(ctx)
defer cancelWrite()
w, err := la.store.GetWriter(writeCtx, partitionInfo, lsn)
if err != nil {
return fmt.Errorf("get backup writer: %w", err)
}
defer func() {
if err := w.Close(); err != nil && returnErr == nil {
returnErr = fmt.Errorf("close log entry backup writer: %w", err)
}
}()
entryParent := filepath.Dir(entryPath)
entryName := filepath.Base(entryPath)
if err := archive.WriteTarball(writeCtx, la.logger, w, entryParent, entryName); err != nil {
// End the context before calling Close to ensure we don't persist the failed
// write to object storage.
cancelWrite()
return fmt.Errorf("backup log archive: %w", err)
}
return nil
}
// popNextNotification removes the next entry from the head of the list.
func (la *LogEntryArchiver) popNextNotification() *partitionNotification {
la.notificationsMutex.Lock()
defer la.notificationsMutex.Unlock()
front := la.notifications.Front()
if front == nil {
return nil
}
return la.notifications.Remove(front).(*partitionNotification)
}
// notifyNewEntries alerts the LogEntryArchiver that new entries are available to backup.
func (la *LogEntryArchiver) notifyNewEntries() {
select {
case la.workCh <- struct{}{}:
// There is already a pending notification, proceed.
default:
}
}
// backoff sleeps for waitDur and doubles the duration for the next backoff call.
func (la *LogEntryArchiver) backoff() {
ticker := la.tickerFunc(la.waitDur)
ticker.Reset()
select {
case <-la.closingCh:
ticker.Stop()
case <-ticker.C():
}
la.waitDur *= 2
if la.waitDur > maxRetryWait {
la.waitDur = maxRetryWait
}
}
// Describe is used to describe Prometheus metrics.
func (la *LogEntryArchiver) Describe(descs chan<- *prometheus.Desc) {
prometheus.DescribeByCollect(la, descs)
}
// Collect is used to collect Prometheus metrics.
func (la *LogEntryArchiver) Collect(metrics chan<- prometheus.Metric) {
la.backupCounter.Collect(metrics)
}
// LogEntryStore manages uploaded log entry archives in object storage.
type LogEntryStore struct {
sink *Sink
}
// NewLogEntryStore returns a new LogEntryStore.
func NewLogEntryStore(sink *Sink) LogEntryStore {
return LogEntryStore{
sink: sink,
}
}
// Exists returns true if a log entry for the specified partition and LSN exists in the store.
func (s LogEntryStore) Exists(ctx context.Context, info PartitionInfo, lsn storage.LSN) (bool, error) {
exists, err := s.sink.Exists(ctx, archivePath(info, lsn))
if err != nil {
return false, fmt.Errorf("exists: %w", err)
}
return exists, nil
}
// GetReader returns a reader in order to read a log entry from the store.
func (s *LogEntryStore) GetReader(ctx context.Context, info PartitionInfo, lsn storage.LSN) (io.ReadCloser, error) {
r, err := s.sink.GetReader(ctx, archivePath(info, lsn))
if err != nil {
return nil, fmt.Errorf("get reader: %w", err)
}
return r, nil
}
// GetWriter returns a writer in order to write a new log entry into the store.
func (s LogEntryStore) GetWriter(ctx context.Context, info PartitionInfo, lsn storage.LSN) (io.WriteCloser, error) {
w, err := s.sink.GetWriter(ctx, archivePath(info, lsn))
if err != nil {
return nil, fmt.Errorf("get writer: %w", err)
}
return w, nil
}
// Query returns an iterator that finds all log entries in the store for the
// given partition starting at the LSN specified by from.
func (s LogEntryStore) Query(info PartitionInfo, from storage.LSN) *LogEntryIterator {
it := s.sink.List(partitionDir(info))
return &LogEntryIterator{it: it, from: from}
}
// LogEntryIterator iterates over archived log entries in object-storage.
type LogEntryIterator struct {
it *ListIterator
from storage.LSN
err error
lsn storage.LSN
path string
}
// Next iterates to the next item. Returns false if there are no more results.
func (it *LogEntryIterator) Next(ctx context.Context) bool {
if it.err != nil {
return false
}
ok := it.it.Next(ctx)
if !ok {
return false
}
it.lsn, it.err = extractLSN(it.it.Path())
if it.err != nil {
return false
}
for it.lsn < it.from {
ok = it.it.Next(ctx)
if !ok {
return false
}
it.lsn, it.err = extractLSN(it.it.Path())
if it.err != nil {
return false
}
}
return true
}
// Err returns a iteration error if there were any.
func (it *LogEntryIterator) Err() error {
return it.err
}
// Path of the current log entry.
func (it *LogEntryIterator) Path() string {
return it.path
}
// LSN of the current log entry.
func (it *LogEntryIterator) LSN() storage.LSN {
return it.lsn
}
func extractLSN(path string) (storage.LSN, error) {
rawLSN := strings.TrimSuffix(filepath.Base(path), ".tar")
return storage.ParseLSN(rawLSN)
}
func partitionDir(info PartitionInfo) string {
return filepath.Join(info.StorageName, fmt.Sprintf("%d", info.PartitionID))
}
func archivePath(info PartitionInfo, lsn storage.LSN) string {
return filepath.Join(partitionDir(info), lsn.String()+".tar")
}