banyand/backup/lifecycle/lifecycle.go (74 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Package lifecycle provides the lifecycle migration service.
package lifecycle
import (
"context"
"os"
"os/signal"
"syscall"
"time"
"github.com/benbjohnson/clock"
"github.com/robfig/cron/v3"
"github.com/spf13/cobra"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/config"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
"github.com/apache/skywalking-banyandb/pkg/version"
)
// NewCommand creates a new lifecycle command.
func NewCommand() *cobra.Command {
var schedule string
l := logger.GetLogger("bootstrap")
metaSvc, err := metadata.NewClient(false, false)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate metadata service")
}
metricSvc := observability.NewMetricService(metaSvc, nil, "lifecycle", nil)
svc := NewService(metaSvc, metricSvc)
group := run.NewGroup("lifecycle")
group.Register(metaSvc, metricSvc, svc)
cmd := &cobra.Command{
Short: "Run lifecycle migration",
DisableAutoGenTag: true,
Version: version.Build(),
RunE: func(cmd *cobra.Command, _ []string) error {
if err := config.Load("logging", cmd.Flags()); err != nil {
return err
}
if schedule == "" {
return group.Run(context.Background())
}
schedLogger := logger.GetLogger().Named("lifecycle-scheduler")
schedLogger.Info().Msgf("lifecycle migration will run with schedule: %s", schedule)
clockInstance := clock.New()
sch := timestamp.NewScheduler(schedLogger, clockInstance)
err := sch.Register("lifecycle", cron.Descriptor, schedule, func(_ time.Time, l *logger.Logger) bool {
err := group.Run(context.Background())
if err != nil {
l.Error().Err(err).Msg("lifecycle migration failed")
} else {
l.Info().Msg("lifecycle migration succeeded")
}
return true
})
if err != nil {
return err
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
schedLogger.Info().Msg("backup scheduler started, press Ctrl+C to exit")
<-sigChan
schedLogger.Info().Msg("shutting down lifecycle migration scheduler...")
sch.Close()
return nil
},
}
cmd.Flags().AddFlagSet(group.RegisterFlags().FlagSet)
cmd.Flags().StringVar(
&schedule,
"schedule",
"",
"Schedule expression for periodic backup. Options: @yearly, @monthly, @weekly, @daily, @hourly or @every <duration>",
)
return cmd
}