banyand/backup/lifecycle/progress.go (137 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package lifecycle import ( "encoding/json" "os" "sync" "github.com/apache/skywalking-banyandb/pkg/logger" ) // Progress tracks the lifecycle migration progress to support resume after crash. type Progress struct { CompletedGroups map[string]bool `json:"completed_groups"` CompletedStreams map[string]map[string]bool `json:"completed_streams"` CompletedMeasures map[string]map[string]bool `json:"completed_measures"` DeletedStreamGroups map[string]bool `json:"deleted_stream_groups"` DeletedMeasureGroups map[string]bool `json:"deleted_measure_groups"` mu sync.Mutex `json:"-"` } // NewProgress creates a new Progress tracker. func NewProgress() *Progress { return &Progress{ CompletedGroups: make(map[string]bool), CompletedStreams: make(map[string]map[string]bool), CompletedMeasures: make(map[string]map[string]bool), DeletedStreamGroups: make(map[string]bool), DeletedMeasureGroups: make(map[string]bool), } } // LoadProgress loads progress from a file if it exists. func LoadProgress(path string, l *logger.Logger) *Progress { if path == "" { return NewProgress() } data, err := os.ReadFile(path) if err != nil { if os.IsNotExist(err) { l.Info().Msgf("No existing progress file at %s, starting fresh", path) } else { l.Warn().Err(err).Msgf("Failed to read progress file at %s, starting fresh", path) } return NewProgress() } progress := NewProgress() if err := json.Unmarshal(data, progress); err != nil { l.Warn().Err(err).Msgf("Failed to parse progress file at %s, starting fresh", path) return NewProgress() } l.Info().Msgf("Loaded existing progress from %s", path) return progress } // Save writes the progress to the specified file. func (p *Progress) Save(path string, l *logger.Logger) { if path == "" { return } p.mu.Lock() defer p.mu.Unlock() data, err := json.MarshalIndent(p, "", " ") if err != nil { l.Error().Err(err).Msg("Failed to marshal progress data") return } err = os.WriteFile(path, data, 0o600) if err != nil { l.Error().Err(err).Msgf("Failed to write progress file at %s", path) return } l.Debug().Msg("Progress saved successfully") } // MarkGroupCompleted marks a group as completed. func (p *Progress) MarkGroupCompleted(group string) { p.mu.Lock() defer p.mu.Unlock() p.CompletedGroups[group] = true } // IsGroupCompleted checks if a group has been completed. func (p *Progress) IsGroupCompleted(group string) bool { p.mu.Lock() defer p.mu.Unlock() return p.CompletedGroups[group] } // MarkStreamCompleted marks a stream as completed. func (p *Progress) MarkStreamCompleted(group, stream string) { p.mu.Lock() defer p.mu.Unlock() if p.CompletedStreams[group] == nil { p.CompletedStreams[group] = make(map[string]bool) } p.CompletedStreams[group][stream] = true } // IsStreamCompleted checks if a stream has been completed. func (p *Progress) IsStreamCompleted(group, stream string) bool { p.mu.Lock() defer p.mu.Unlock() if streams, ok := p.CompletedStreams[group]; ok { return streams[stream] } return false } // MarkMeasureCompleted marks a measure as completed. func (p *Progress) MarkMeasureCompleted(group, measure string) { p.mu.Lock() defer p.mu.Unlock() if p.CompletedMeasures[group] == nil { p.CompletedMeasures[group] = make(map[string]bool) } p.CompletedMeasures[group][measure] = true } // IsMeasureCompleted checks if a measure has been completed. func (p *Progress) IsMeasureCompleted(group, measure string) bool { p.mu.Lock() defer p.mu.Unlock() if measures, ok := p.CompletedMeasures[group]; ok { return measures[measure] } return false } // MarkStreamGroupDeleted marks a stream group segments as deleted. func (p *Progress) MarkStreamGroupDeleted(group string) { p.mu.Lock() defer p.mu.Unlock() p.DeletedStreamGroups[group] = true } // IsStreamGroupDeleted checks if a stream group segments have been deleted. func (p *Progress) IsStreamGroupDeleted(group string) bool { p.mu.Lock() defer p.mu.Unlock() return p.DeletedStreamGroups[group] } // MarkMeasureGroupDeleted marks a measure group segments as deleted. func (p *Progress) MarkMeasureGroupDeleted(group string) { p.mu.Lock() defer p.mu.Unlock() p.DeletedMeasureGroups[group] = true } // IsMeasureGroupDeleted checks if a measure group segments have been deleted. func (p *Progress) IsMeasureGroupDeleted(group string) bool { p.mu.Lock() defer p.mu.Unlock() return p.DeletedMeasureGroups[group] } // Remove deletes the progress file. func (p *Progress) Remove(path string, l *logger.Logger) { if path == "" { return } if err := os.Remove(path); err != nil { if !os.IsNotExist(err) { l.Warn().Err(err).Msgf("Failed to remove progress file at %s", path) } return } l.Info().Msgf("Removed progress file at %s", path) }