banyand/backup/lifecycle/service.go (339 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package lifecycle
import (
"context"
"math"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/apache/skywalking-banyandb/api/common"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/backup/snapshot"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/node"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/model"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
type service interface {
run.Config
run.Service
}
var _ service = (*lifecycleService)(nil)
type lifecycleService struct {
metadata metadata.Repo
omr observability.MetricsRegistry
l *logger.Logger
gRPCAddr string
cert string
streamRoot string
measureRoot string
progressFilePath string
enableTLS bool
insecure bool
}
// NewService creates a new lifecycle service.
func NewService(meta metadata.Repo, omr observability.MetricsRegistry) run.Unit {
return &lifecycleService{
metadata: meta,
omr: omr,
}
}
func (l *lifecycleService) FlagSet() *run.FlagSet {
flagS := run.NewFlagSet(l.Name())
flagS.StringSliceVar(&common.FlagNodeLabels, "node-labels", nil, "the node labels. e.g. key1=value1,key2=value2")
flagS.StringVar(&l.gRPCAddr, "grpc-addr", "127.0.0.1:17912", "gRPC address of the data node")
flagS.BoolVar(&l.enableTLS, "enable-tls", false, "Enable TLS for gRPC connection")
flagS.BoolVar(&l.insecure, "insecure", false, "Skip server certificate verification")
flagS.StringVar(&l.cert, "cert", "", "Path to the gRPC server certificate")
flagS.StringVar(&l.streamRoot, "stream-root-path", "/tmp", "Root directory for stream catalog")
flagS.StringVar(&l.measureRoot, "measure-root-path", "/tmp", "Root directory for measure catalog")
flagS.StringVar(&l.progressFilePath, "progress-file", "/tmp/lifecycle-progress.json", "Path to store progress for crash recovery")
return flagS
}
func (l *lifecycleService) Validate() error {
return nil
}
func (l *lifecycleService) GracefulStop() {
}
func (l *lifecycleService) Name() string {
return "lifecycle"
}
func (l *lifecycleService) Serve() run.StopNotify {
l.l = logger.GetLogger("lifecycle")
ctx := context.Background()
done := make(chan struct{})
close(done)
progress := LoadProgress(l.progressFilePath, l.l)
groups, err := l.getGroupsToProcess(ctx, progress)
if err != nil {
l.l.Error().Err(err).Msg("failed to get groups to process")
return done
}
if len(groups) == 0 {
l.l.Info().Msg("no groups to process, all groups already completed")
progress.Remove(l.progressFilePath, l.l)
return done
}
streamDir, measureDir, err := l.getSnapshots(groups)
if err != nil {
l.l.Error().Err(err).Msg("failed to get snapshots")
return done
}
streamSVC, measureSVC, err := l.setupQuerySvc(ctx, streamDir, measureDir)
if err != nil {
l.l.Error().Err(err).Msg("failed to setup query service")
return done
}
if streamSVC != nil {
defer streamSVC.GracefulStop()
}
if measureSVC != nil {
defer measureSVC.GracefulStop()
}
nodes, err := l.metadata.NodeRegistry().ListNode(ctx, databasev1.Role_ROLE_DATA)
if err != nil {
l.l.Error().Err(err).Msg("failed to list data nodes")
return done
}
labels := common.ParseNodeFlags()
for _, g := range groups {
switch g.Catalog {
case commonv1.Catalog_CATALOG_STREAM:
if streamSVC == nil {
l.l.Error().Msgf("stream service is not available, skipping group: %s", g.Metadata.Name)
continue
}
l.processStreamGroup(ctx, g, streamSVC, nodes, labels, progress)
case commonv1.Catalog_CATALOG_MEASURE:
if measureSVC == nil {
l.l.Error().Msgf("measure service is not available, skipping group: %s", g.Metadata.Name)
continue
}
l.processMeasureGroup(ctx, g, measureSVC, nodes, labels, progress)
default:
l.l.Info().Msgf("group catalog: %s doesn't support lifecycle management", g.Catalog)
}
progress.MarkGroupCompleted(g.Metadata.Name)
progress.Save(l.progressFilePath, l.l)
}
progress.Remove(l.progressFilePath, l.l)
l.l.Info().Msg("lifecycle migration completed successfully")
return done
}
func (l *lifecycleService) getGroupsToProcess(ctx context.Context, progress *Progress) ([]*commonv1.Group, error) {
gg, err := l.metadata.GroupRegistry().ListGroup(ctx)
if err != nil {
l.l.Error().Err(err).Msg("failed to list groups")
return nil, err
}
groups := make([]*commonv1.Group, 0, len(gg))
for _, g := range gg {
if g.ResourceOpts == nil {
continue
}
if len(g.ResourceOpts.Stages) == 0 {
continue
}
if progress.IsGroupCompleted(g.Metadata.Name) {
l.l.Info().Msgf("skipping already completed group: %s", g.Metadata.Name)
continue
}
groups = append(groups, g)
}
return groups, nil
}
func (l *lifecycleService) processStreamGroup(ctx context.Context, g *commonv1.Group, streamSVC stream.Service,
nodes []*databasev1.Node, labels map[string]string, progress *Progress,
) {
shardNum, selector, client, err := parseGroup(ctx, g, labels, nodes, l.l, l.metadata)
if err != nil {
l.l.Error().Err(err).Msgf("failed to parse group %s", g.Metadata.Name)
return
}
defer client.GracefulStop()
ss, err := l.metadata.StreamRegistry().ListStream(ctx, schema.ListOpt{Group: g.Metadata.Name})
if err != nil {
l.l.Error().Err(err).Msgf("failed to list streams in group %s", g.Metadata.Name)
return
}
tr := streamSVC.GetRemovalSegmentsTimeRange(g.Metadata.Name)
l.processStreams(ctx, g, ss, streamSVC, tr, shardNum, selector, client, progress)
l.deleteExpiredStreamSegments(ctx, g, tr, progress)
}
func (l *lifecycleService) processStreams(ctx context.Context, g *commonv1.Group, streams []*databasev1.Stream,
streamSVC stream.Service, tr *timestamp.TimeRange, shardNum uint32, selector node.Selector, client queue.Client, progress *Progress,
) {
for _, s := range streams {
if progress.IsStreamCompleted(g.Metadata.Name, s.Metadata.Name) {
l.l.Info().Msgf("skipping already completed stream: %s/%s", g.Metadata.Name, s.Metadata.Name)
continue
}
if sum, err := l.processSingleStream(ctx, s, streamSVC, tr, shardNum, selector, client); err == nil {
l.l.Info().Msgf("migrated %d elements in stream %s", sum, s.Metadata.Name)
}
progress.MarkStreamCompleted(g.Metadata.Name, s.Metadata.Name)
progress.Save(l.progressFilePath, l.l)
}
}
func (l *lifecycleService) processSingleStream(ctx context.Context, s *databasev1.Stream,
streamSVC stream.Service, tr *timestamp.TimeRange, shardNum uint32, selector node.Selector, client queue.Client,
) (int, error) {
q, err := streamSVC.Stream(s.Metadata)
if err != nil {
l.l.Error().Err(err).Msgf("failed to get stream %s", s.Metadata.Name)
return 0, err
}
tagProjection := make([]model.TagProjection, len(s.TagFamilies))
entity := make([]*modelv1.TagValue, len(s.Entity.TagNames))
for idx := range s.Entity.TagNames {
entity[idx] = pbv1.AnyTagValue
}
for i, tf := range s.TagFamilies {
tagProjection[i] = model.TagProjection{
Family: tf.Name,
Names: make([]string, len(tf.Tags)),
}
for j, t := range tf.Tags {
tagProjection[i].Names[j] = t.Name
}
}
result, err := q.Query(ctx, model.StreamQueryOptions{
Name: s.Metadata.Name,
TagProjection: tagProjection,
Entities: [][]*modelv1.TagValue{entity},
TimeRange: tr,
MaxElementSize: math.MaxInt,
})
if err != nil {
l.l.Error().Err(err).Msgf("failed to query stream %s", s.Metadata.Name)
return 0, err
}
return migrateStream(ctx, s, result, shardNum, selector, client, l.l), nil
}
func (l *lifecycleService) deleteExpiredStreamSegments(ctx context.Context, g *commonv1.Group, tr *timestamp.TimeRange, progress *Progress) {
if progress.IsStreamGroupDeleted(g.Metadata.Name) {
l.l.Info().Msgf("skipping already deleted stream group segments: %s", g.Metadata.Name)
return
}
resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert, func(conn *grpc.ClientConn) (*streamv1.DeleteExpiredSegmentsResponse, error) {
client := streamv1.NewStreamServiceClient(conn)
return client.DeleteExpiredSegments(ctx, &streamv1.DeleteExpiredSegmentsRequest{
Group: g.Metadata.Name,
TimeRange: &modelv1.TimeRange{
Begin: timestamppb.New(tr.Start),
End: timestamppb.New(tr.End),
},
})
})
if err != nil {
l.l.Error().Err(err).Msgf("failed to delete expired segments in group %s", g.Metadata.Name)
return
}
l.l.Info().Msgf("deleted %d expired segments in group %s", resp.Deleted, g.Metadata.Name)
progress.MarkStreamGroupDeleted(g.Metadata.Name)
progress.Save(l.progressFilePath, l.l)
}
func (l *lifecycleService) processMeasureGroup(ctx context.Context, g *commonv1.Group, measureSVC measure.Service,
nodes []*databasev1.Node, labels map[string]string, progress *Progress,
) {
shardNum, selector, client, err := parseGroup(ctx, g, labels, nodes, l.l, l.metadata)
if err != nil {
l.l.Error().Err(err).Msgf("failed to parse group %s", g.Metadata.Name)
return
}
defer client.GracefulStop()
mm, err := l.metadata.MeasureRegistry().ListMeasure(ctx, schema.ListOpt{Group: g.Metadata.Name})
if err != nil {
l.l.Error().Err(err).Msgf("failed to list measures in group %s", g.Metadata.Name)
return
}
tr := measureSVC.GetRemovalSegmentsTimeRange(g.Metadata.Name)
l.processMeasures(ctx, g, mm, measureSVC, tr, shardNum, selector, client, progress)
l.deleteExpiredMeasureSegments(ctx, g, tr, progress)
}
func (l *lifecycleService) processMeasures(ctx context.Context, g *commonv1.Group, measures []*databasev1.Measure,
measureSVC measure.Service, tr *timestamp.TimeRange, shardNum uint32, selector node.Selector, client queue.Client, progress *Progress,
) {
for _, m := range measures {
if progress.IsMeasureCompleted(g.Metadata.Name, m.Metadata.Name) {
l.l.Info().Msgf("skipping already completed measure: %s/%s", g.Metadata.Name, m.Metadata.Name)
continue
}
if sum, err := l.processSingleMeasure(ctx, m, measureSVC, tr, shardNum, selector, client); err == nil {
l.l.Info().Msgf("migrated %d elements in measure %s", sum, m.Metadata.Name)
}
progress.MarkMeasureCompleted(g.Metadata.Name, m.Metadata.Name)
progress.Save(l.progressFilePath, l.l)
}
}
func (l *lifecycleService) processSingleMeasure(ctx context.Context, m *databasev1.Measure,
measureSVC measure.Service, tr *timestamp.TimeRange, shardNum uint32, selector node.Selector, client queue.Client,
) (int, error) {
q, err := measureSVC.Measure(m.Metadata)
if err != nil {
l.l.Error().Err(err).Msgf("failed to get measure %s", m.Metadata.Name)
return 0, err
}
tagProjection := make([]model.TagProjection, len(m.TagFamilies))
for i, tf := range m.TagFamilies {
tagProjection[i] = model.TagProjection{
Family: tf.Name,
Names: make([]string, len(tf.Tags)),
}
for j, t := range tf.Tags {
tagProjection[i].Names[j] = t.Name
}
}
fieldProjection := make([]string, len(m.Fields))
for i, f := range m.Fields {
fieldProjection[i] = f.Name
}
entity := make([]*modelv1.TagValue, len(m.Entity.TagNames))
for idx := range m.Entity.TagNames {
entity[idx] = pbv1.AnyTagValue
}
result, err := q.Query(ctx, model.MeasureQueryOptions{
Name: m.Metadata.Name,
TagProjection: tagProjection,
FieldProjection: fieldProjection,
Entities: [][]*modelv1.TagValue{entity},
TimeRange: tr,
})
if err != nil {
l.l.Error().Err(err).Msgf("failed to query measure %s", m.Metadata.Name)
return 0, err
}
return migrateMeasure(ctx, m, result, shardNum, selector, client, l.l), nil
}
func (l *lifecycleService) deleteExpiredMeasureSegments(ctx context.Context, g *commonv1.Group, tr *timestamp.TimeRange, progress *Progress) {
if progress.IsMeasureGroupDeleted(g.Metadata.Name) {
l.l.Info().Msgf("skipping already deleted measure group segments: %s", g.Metadata.Name)
return
}
resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert, func(conn *grpc.ClientConn) (*measurev1.DeleteExpiredSegmentsResponse, error) {
client := measurev1.NewMeasureServiceClient(conn)
return client.DeleteExpiredSegments(ctx, &measurev1.DeleteExpiredSegmentsRequest{
Group: g.Metadata.Name,
TimeRange: &modelv1.TimeRange{
Begin: timestamppb.New(tr.Start),
End: timestamppb.New(tr.End),
},
})
})
if err != nil {
l.l.Error().Err(err).Msgf("failed to delete expired segments in group %s", g.Metadata.Name)
return
}
l.l.Info().Msgf("deleted %d expired segments in group %s", resp.Deleted, g.Metadata.Name)
progress.MarkMeasureGroupDeleted(g.Metadata.Name)
progress.Save(l.progressFilePath, l.l)
}