banyand/backup/lifecycle/steps.go (262 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package lifecycle import ( "context" "encoding/base64" "fmt" "time" "github.com/pkg/errors" "google.golang.org/protobuf/types/known/timestamppb" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/banyand/backup/snapshot" "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/queue/pub" "github.com/apache/skywalking-banyandb/banyand/stream" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/node" "github.com/apache/skywalking-banyandb/pkg/partition" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query/model" ) func (l *lifecycleService) getSnapshots(groups []*commonv1.Group) (streamDir string, measureDir string, err error) { snapshotGroups := make([]*databasev1.SnapshotRequest_Group, 0, len(groups)) for _, group := range groups { snapshotGroups = append(snapshotGroups, &databasev1.SnapshotRequest_Group{ Group: group.Metadata.Name, Catalog: group.Catalog, }) } snn, err := snapshot.Get(l.gRPCAddr, l.enableTLS, l.insecure, l.cert, snapshotGroups...) if err != nil { return "", "", err } for _, snp := range snn { snapshotDir, errDir := snapshot.Dir(snp, l.streamRoot, l.measureRoot, "") if errDir != nil { logger.Warningf("Failed to get snapshot directory for %s: %v", snp.Name, errDir) continue } if snp.Catalog == commonv1.Catalog_CATALOG_STREAM { streamDir = snapshotDir } if snp.Catalog == commonv1.Catalog_CATALOG_MEASURE { measureDir = snapshotDir } } return streamDir, measureDir, nil } func (l *lifecycleService) setupQuerySvc(ctx context.Context, streamDir, measureDir string) (streamSVC stream.Service, measureSVC measure.Service, err error) { pm := protector.NewMemory(l.omr) ctx = context.WithValue(ctx, common.ContextNodeKey, common.Node{}) if streamDir != "" { streamSVC, err = stream.NewReadonlyService(l.metadata, l.omr, pm) if err != nil { return nil, nil, err } if err = streamSVC.FlagSet().Parse([]string{"--stream-data-path", streamDir}); err != nil { return nil, nil, err } if err = streamSVC.PreRun(ctx); err != nil { return nil, nil, err } } if measureDir != "" { measureSVC, err = measure.NewReadonlyService(l.metadata, l.omr, pm) if err != nil { return nil, nil, err } if err = measureSVC.FlagSet().Parse([]string{"--measure-data-path", measureDir}); err != nil { return nil, nil, err } if err = measureSVC.PreRun(ctx); err != nil { return nil, nil, err } } return streamSVC, measureSVC, nil } func parseGroup(ctx context.Context, g *commonv1.Group, nodeLabels map[string]string, nodes []*databasev1.Node, l *logger.Logger, metadata metadata.Repo, ) (uint32, node.Selector, queue.Client, error) { ro := g.ResourceOpts if ro == nil { return 0, nil, nil, fmt.Errorf("no resource opts in group %s", g.Metadata.Name) } if len(ro.Stages) == 0 { return 0, nil, nil, fmt.Errorf("no stages in group %s", g.Metadata.Name) } var nst *commonv1.LifecycleStage for i, st := range ro.Stages { selector, err := pub.ParseLabelSelector(st.NodeSelector) if err != nil { return 0, nil, nil, errors.WithMessagef(err, "failed to parse node selector %s", st.NodeSelector) } if !selector.Matches(nodeLabels) { continue } if i+1 >= len(ro.Stages) { l.Info().Msgf("no next stage for group %s at stage %s", g.Metadata.Name, st.Name) return 0, nil, nil, nil } nst = ro.Stages[i+1] l.Info().Msgf("migrating group %s at stage %s to stage %s", g.Metadata.Name, st.Name, nst.Name) break } if nst == nil { nst = ro.Stages[0] } nsl, err := pub.ParseLabelSelector(nst.NodeSelector) if err != nil { return 0, nil, nil, errors.WithMessagef(err, "failed to parse node selector %s", nst.NodeSelector) } nodeSel := node.NewRoundRobinSelector("", metadata) if err = nodeSel.PreRun(ctx); err != nil { return 0, nil, nil, errors.WithMessage(err, "failed to run node selector") } client := pub.NewWithoutMetadata() if g.Catalog == commonv1.Catalog_CATALOG_STREAM { _ = grpc.NewClusterNodeRegistry(data.TopicStreamWrite, client, nodeSel) } else { _ = grpc.NewClusterNodeRegistry(data.TopicMeasureWrite, client, nodeSel) } var existed bool for _, n := range nodes { if n.Labels == nil { continue } if nsl.Matches(n.Labels) { existed = true client.OnAddOrUpdate(schema.Metadata{ TypeMeta: schema.TypeMeta{ Kind: schema.KindNode, }, Spec: n, }) } } if !existed { return 0, nil, nil, errors.New("no nodes matched") } return nst.ShardNum, nodeSel, client, nil } func migrateStream(ctx context.Context, s *databasev1.Stream, result model.StreamQueryResult, shardNum uint32, selector node.Selector, client queue.Client, l *logger.Logger, ) (sum int) { if result == nil { return 0 } defer result.Release() entityLocator := partition.NewEntityLocator(s.TagFamilies, s.Entity, 0) batch := client.NewBatchPublisher(30 * time.Second) defer batch.Close() for sr := result.Pull(ctx); sr != nil; sr = result.Pull(ctx) { for i := range sr.ElementIDs { writeEntity := &streamv1.WriteRequest{ Metadata: s.Metadata, Element: &streamv1.ElementValue{}, } ev := writeEntity.Element ev.ElementId = base64.StdEncoding.EncodeToString(convert.Uint64ToBytes(sr.ElementIDs[i])) ev.Timestamp = timestamppb.New(time.Unix(0, sr.Timestamps[i])) for _, tf := range sr.TagFamilies { tfw := &modelv1.TagFamilyForWrite{} for _, tag := range tf.Tags { tfw.Tags = append(tfw.Tags, tag.Values[i]) } ev.TagFamilies = append(ev.TagFamilies, tfw) } entity, tagValues, shardID, err := entityLocator.Locate(s.Metadata.Name, ev.TagFamilies, shardNum) if err != nil { l.Error().Err(err).Msg("failed to locate entity") continue } nodeID, err := selector.Pick(s.Metadata.Group, s.Metadata.Name, uint32(shardID)) if err != nil { l.Error().Err(err).Msg("failed to pick node") continue } iwr := &streamv1.InternalWriteRequest{ Request: writeEntity, ShardId: uint32(shardID), SeriesHash: pbv1.HashEntity(entity), EntityValues: tagValues[1:].Encode(), } message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr) _, err = batch.Publish(ctx, data.TopicStreamWrite, message) if err != nil { l.Error().Err(err).Msg("failed to publish message") continue } sum++ } } return sum } func migrateMeasure(ctx context.Context, m *databasev1.Measure, result model.MeasureQueryResult, shardNum uint32, selector node.Selector, client queue.Client, l *logger.Logger, ) (sum int) { if result == nil { return 0 } defer result.Release() entityLocator := partition.NewEntityLocator(m.TagFamilies, m.Entity, 0) batch := client.NewBatchPublisher(30 * time.Second) defer batch.Close() for mr := result.Pull(); mr != nil; mr = result.Pull() { for i := range mr.Timestamps { writeRequest := &measurev1.WriteRequest{ Metadata: m.Metadata, DataPoint: &measurev1.DataPointValue{ Timestamp: timestamppb.New(time.Unix(0, mr.Timestamps[i])), Version: mr.Versions[i], }, MessageId: uint64(time.Now().UnixNano()), } for _, tf := range mr.TagFamilies { tfWrite := &modelv1.TagFamilyForWrite{} for _, tag := range tf.Tags { tfWrite.Tags = append(tfWrite.Tags, tag.Values[i]) } writeRequest.DataPoint.TagFamilies = append(writeRequest.DataPoint.TagFamilies, tfWrite) } for _, field := range mr.Fields { writeRequest.DataPoint.Fields = append(writeRequest.DataPoint.Fields, field.Values[i]) } entity, tagValues, shardID, err := entityLocator.Locate(m.Metadata.Name, writeRequest.DataPoint.TagFamilies, shardNum) if err != nil { l.Error().Err(err).Msg("failed to locate entity") continue } nodeID, err := selector.Pick(m.Metadata.Group, m.Metadata.Name, uint32(shardID)) if err != nil { l.Error().Err(err).Msg("failed to pick node") continue } iwr := &measurev1.InternalWriteRequest{ Request: writeRequest, ShardId: uint32(shardID), SeriesHash: pbv1.HashEntity(entity), EntityValues: tagValues[1:].Encode(), } message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr) _, err = batch.Publish(ctx, data.TopicMeasureWrite, message) if err != nil { l.Error().Err(err).Msg("failed to publish message") } else { sum++ } } } return sum }