banyand/measure/snapshot.go (231 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package measure import ( "context" "encoding/json" "fmt" "path/filepath" "sync" "sync/atomic" "time" "github.com/pkg/errors" "go.uber.org/multierr" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/schema" ) func (tst *tsTable) currentSnapshot() *snapshot { tst.RLock() defer tst.RUnlock() if tst.snapshot == nil { return nil } s := tst.snapshot s.incRef() return s } type snapshotCreator rune const ( snapshotCreatorMemPart = iota snapshotCreatorFlusher snapshotCreatorMerger snapshotCreatorMergedFlusher ) type snapshot struct { parts []*partWrapper epoch uint64 creator snapshotCreator ref int32 } func (s *snapshot) getParts(dst []*part, minTimestamp, maxTimestamp int64) ([]*part, int) { var count int for _, p := range s.parts { pm := p.p.partMetadata if maxTimestamp < pm.MinTimestamp || minTimestamp > pm.MaxTimestamp { continue } dst = append(dst, p.p) count++ } return dst, count } func (s *snapshot) incRef() { atomic.AddInt32(&s.ref, 1) } func (s *snapshot) decRef() { n := atomic.AddInt32(&s.ref, -1) if n > 0 { return } for i := range s.parts { s.parts[i].decRef() } s.parts = s.parts[:0] } func (s *snapshot) copyAllTo(nextEpoch uint64) snapshot { var result snapshot result.epoch = nextEpoch result.ref = 1 for i := range s.parts { s.parts[i].incRef() result.parts = append(result.parts, s.parts[i]) } return result } func (s *snapshot) merge(nextEpoch uint64, nextParts map[uint64]*partWrapper) snapshot { var result snapshot result.epoch = nextEpoch result.ref = 1 for i := 0; i < len(s.parts); i++ { if n, ok := nextParts[s.parts[i].ID()]; ok { result.parts = append(result.parts, n) continue } s.parts[i].incRef() result.parts = append(result.parts, s.parts[i]) } return result } func (s *snapshot) remove(nextEpoch uint64, merged map[uint64]struct{}) snapshot { var result snapshot result.epoch = nextEpoch result.ref = 1 for i := 0; i < len(s.parts); i++ { if _, ok := merged[s.parts[i].ID()]; !ok { s.parts[i].incRef() result.parts = append(result.parts, s.parts[i]) continue } s.parts[i].removable.Store(true) } return result } func snapshotName(snapshot uint64) string { return fmt.Sprintf("%016x%s", snapshot, snapshotSuffix) } func parseSnapshot(name string) (uint64, error) { if filepath.Ext(name) != snapshotSuffix { return 0, errors.New("invalid snapshot file ext") } if len(name) < 16 { return 0, errors.New("invalid snapshot file name") } return parseEpoch(name[:16]) } func (tst *tsTable) TakeFileSnapshot(dst string) error { snapshot := tst.currentSnapshot() if snapshot == nil { return fmt.Errorf("no current snapshot available") } defer snapshot.decRef() for _, pw := range snapshot.parts { if pw.mp != nil { continue } part := pw.p srcPath := part.path destPartPath := filepath.Join(dst, filepath.Base(srcPath)) if err := tst.fileSystem.CreateHardLink(srcPath, destPartPath, nil); err != nil { return fmt.Errorf("failed to create snapshot for part %d: %w", part.partMetadata.ID, err) } } tst.createMetadata(dst, snapshot) parent := filepath.Dir(dst) tst.fileSystem.SyncPath(parent) return nil } func (tst *tsTable) createMetadata(dst string, snapshot *snapshot) { var partNames []string for i := range snapshot.parts { partNames = append(partNames, partName(snapshot.parts[i].ID())) } data, err := json.Marshal(partNames) if err != nil { logger.Panicf("cannot marshal partNames to JSON: %s", err) } snapshotPath := filepath.Join(dst, snapshotName(snapshot.epoch)) lf, err := tst.fileSystem.CreateFile(snapshotPath, storage.FilePerm) if err != nil { logger.Panicf("cannot create lock file %s: %s", snapshotPath, err) } n, err := lf.Write(data) if err != nil { logger.Panicf("cannot write snapshot %s: %s", snapshotPath, err) } if n != len(data) { logger.Panicf("unexpected number of bytes written to %s; got %d; want %d", snapshotPath, n, len(data)) } } func (s *service) takeGroupSnapshot(dstDir string, groupName string) error { group, ok := s.schemaRepo.LoadGroup(groupName) if !ok { return errors.Errorf("group %s not found", groupName) } db := group.SupplyTSDB() if db == nil { return errors.Errorf("group %s has no tsdb", group.GetSchema().Metadata.Name) } tsdb := db.(storage.TSDB[*tsTable, option]) if err := tsdb.TakeFileSnapshot(dstDir); err != nil { return errors.WithMessagef(err, "snapshot %s fail to take file snapshot for group %s", dstDir, group.GetSchema().Metadata.Name) } return nil } type snapshotListener struct { *bus.UnImplementedHealthyListener s *service snapshotSeq uint64 snapshotMux sync.Mutex } // Rev takes a snapshot of the database. func (s *snapshotListener) Rev(ctx context.Context, message bus.Message) bus.Message { groups := message.Data().([]*databasev1.SnapshotRequest_Group) var gg []schema.Group if len(groups) == 0 { gg = s.s.schemaRepo.LoadAllGroups() } else { for _, g := range groups { if g.Catalog != commonv1.Catalog_CATALOG_MEASURE { continue } group, ok := s.s.schemaRepo.LoadGroup(g.Group) if !ok { continue } gg = append(gg, group) } } if len(gg) == 0 { return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil) } s.snapshotMux.Lock() defer s.snapshotMux.Unlock() storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum, s.s.lfs) sn := s.snapshotName() var err error for _, g := range gg { select { case <-ctx.Done(): return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil) default: } if errGroup := s.s.takeGroupSnapshot(filepath.Join(s.s.snapshotDir, sn, g.GetSchema().Metadata.Name), g.GetSchema().Metadata.Name); err != nil { s.s.l.Error().Err(errGroup).Str("group", g.GetSchema().Metadata.Name).Msg("fail to take group snapshot") err = multierr.Append(err, errGroup) continue } } snp := &databasev1.Snapshot{ Name: sn, Catalog: commonv1.Catalog_CATALOG_MEASURE, } if err != nil { snp.Error = err.Error() } return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), snp) } func (s *snapshotListener) snapshotName() string { s.snapshotSeq++ return fmt.Sprintf("%s-%08X", time.Now().UTC().Format("20060102150405"), s.snapshotSeq) }