pitr/cli/internal/cmd/restore.go (203 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"fmt"
"os"
"strings"
"time"
"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg"
"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/xerr"
"github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/logging"
"github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/prettyoutput"
"github.com/jedib0t/go-pretty/v6/progress"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)
var (
// database names exist in ss proxy and backup, will to be dropped
databaseNamesExist []string
)
var RestoreCmd = &cobra.Command{
Use: "restore",
Short: "Restore a database cluster ",
Run: func(cmd *cobra.Command, args []string) {
cmd.Flags().VisitAll(func(flag *pflag.Flag) {
fmt.Printf("Flag: %s Value: %s\n", flag.Name, flag.Value)
})
if CSN == "" && RecordID == "" {
logging.Error("Please specify csn or record id")
return
}
if CSN != "" && RecordID != "" {
logging.Error("Please specify only one of csn and record id")
return
}
if err := restore(); err != nil {
logging.Error(err.Error())
}
},
}
func init() {
RootCmd.AddCommand(RestoreCmd)
RestoreCmd.Flags().StringVarP(&Host, "host", "H", "", "ss-proxy hostname or ip")
_ = RestoreCmd.MarkFlagRequired("host")
RestoreCmd.Flags().Uint16VarP(&Port, "port", "P", 0, "ss-proxy port")
_ = RestoreCmd.MarkFlagRequired("port")
RestoreCmd.Flags().StringVarP(&Username, "username", "u", "", "ss-proxy username")
_ = RestoreCmd.MarkFlagRequired("username")
RestoreCmd.Flags().StringVarP(&Password, "password", "p", "", "ss-proxy password")
_ = RestoreCmd.MarkFlagRequired("password")
RestoreCmd.Flags().StringVarP(&BackupPath, "dn-backup-path", "B", "", "openGauss data backup path")
_ = RestoreCmd.MarkFlagRequired("dn-backup-path")
RestoreCmd.Flags().Uint16VarP(&AgentPort, "agent-port", "a", 443, "agent server port")
_ = RestoreCmd.MarkFlagRequired("agent-port")
RestoreCmd.Flags().StringVarP(&CSN, "csn", "", "", "commit sequence number")
RestoreCmd.Flags().StringVarP(&RecordID, "id", "", "", "backup record id")
}
func restore() error {
// init local storage
ls, err := pkg.NewLocalStorage(pkg.DefaultRootDir())
if err != nil {
return xerr.NewCliErr(fmt.Sprintf("new local storage failed, err:%s", err.Error()))
}
proxy, err := pkg.NewShardingSphereProxy(Username, Password, pkg.DefaultDBName, Host, Port)
if err != nil {
return xerr.NewCliErr(fmt.Sprintf("new ss-proxy failed, err:%s", err.Error()))
}
// get backup record
var bak *model.LsBackup
if CSN != "" {
bak, err = ls.ReadByCSN(CSN)
if err != nil {
return xerr.NewCliErr("read backup record by csn failed")
}
}
if RecordID != "" {
bak, err = ls.ReadByID(RecordID)
if err != nil {
return xerr.NewCliErr("read backup record by id failed")
}
}
if bak == nil {
return xerr.NewCliErr("backup record not found")
}
// check if the backup logic database exits,
// if exits, we need to warning user that we will drop the database.
if err := checkDatabaseExist(proxy, bak); err != nil {
return xerr.NewCliErr(fmt.Sprintf("check database exist failed:%s", err.Error()))
}
// check agent server status
logging.Info("Checking agent server status...")
if available := checkAgentServerStatus(bak); !available {
return xerr.NewCliErr("one or more agent server are not available.")
}
// exec restore
logging.Info("Start restore backup data to openGauss...")
if err := execRestore(bak); err != nil {
return xerr.NewCliErr(fmt.Sprintf("exec restore failed:%s", err.Error()))
}
logging.Info("Restore backup data to openGauss success!")
// restore metadata to ss-proxy
if err := restoreDataToSSProxy(proxy, bak); err != nil {
return xerr.NewCliErr(fmt.Sprintf("restore metadata to ss-proxy failed:%s", err.Error()))
}
logging.Info("Restore success!")
return nil
}
func checkDatabaseExist(proxy pkg.IShardingSphereProxy, bak *model.LsBackup) error {
clusterNow, err := proxy.ExportMetaData()
if err != nil {
return xerr.NewCliErr(fmt.Sprintf("get cluster metadata failed:%s", err.Error()))
}
for k := range bak.SsBackup.ClusterInfo.MetaData.Databases {
if _, ok := clusterNow.MetaData.Databases[k]; ok {
databaseNamesExist = append(databaseNamesExist, k)
}
}
if len(databaseNamesExist) == 0 {
return nil
}
// get user input to confirm
prompt := fmt.Sprintf(
"Detected That The Database [%s] Already Exists In ShardingSphere-Proxy Metadata.\n"+
"The Logic Database Will Be DROPPED And Then Insert Backup's Metadata Into ShardingSphere-Proxy After Restoring The Backup Data.\n"+
"Are you sure to continue? (Y|N)", strings.Join(databaseNamesExist, ","))
return getUserApproveInTerminal(prompt)
}
func restoreDataToSSProxy(proxy pkg.IShardingSphereProxy, lsBackup *model.LsBackup) error {
// drop database if exists
for _, shardingDBName := range databaseNamesExist {
logging.Info(fmt.Sprintf("Dropping database: [%s] ...", shardingDBName))
if err := proxy.DropDatabase(shardingDBName); err != nil {
return xerr.NewCliErr(fmt.Sprintf("drop database failed:%s", err.Error()))
}
}
// import metadata
if err := proxy.ImportMetaData(lsBackup.SsBackup.ClusterInfo); err != nil {
return xerr.NewCliErr(fmt.Sprintf("Import metadata to ss-proxy failed:%s", err.Error()))
}
return nil
}
func execRestore(lsBackup *model.LsBackup) error {
var (
totalNum = len(lsBackup.SsBackup.StorageNodes)
dataNodeMap = make(map[string]*model.DataNode)
resultCh = make(chan *model.RestoreResult, totalNum)
dnResult = make([]*model.RestoreResult, 0)
restoreFinalStatus = "Completed"
)
for _, dataNode := range lsBackup.DnList {
dataNodeMap[dataNode.IP] = dataNode
}
if totalNum == 0 {
return xerr.NewCliErr(fmt.Sprintf("no storage node found, please check backup record [%s].", lsBackup.Info.ID))
}
pw := prettyoutput.NewPW(totalNum)
go pw.Render()
for i := 0; i < totalNum; i++ {
sn := lsBackup.SsBackup.StorageNodes[i]
dn := dataNodeMap[sn.IP]
as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort))
go doRestore(as, sn, dn.BackupID, resultCh, pw)
}
time.Sleep(time.Millisecond * 100)
for pw.IsRenderInProgress() {
time.Sleep(time.Millisecond * 100)
}
close(resultCh)
for result := range resultCh {
dnResult = append(dnResult, result)
if result.Status != "Completed" {
restoreFinalStatus = "Failed"
}
}
// print result formatted
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.SetTitle("Restore Task Result: %s", restoreFinalStatus)
t.AppendHeader(table.Row{"#", "Data Node IP", "Data Node Port", "Result"})
for i, dn := range dnResult {
t.AppendRow([]interface{}{i + 1, dn.IP, dn.Port, dn.Status})
t.AppendSeparator()
}
t.Render()
if restoreFinalStatus == "Failed" {
return xerr.NewCliErr("restore failed, please check the log for more details.")
}
return nil
}
func doRestore(as pkg.IAgentServer, sn *model.StorageNode, backupID string, resultCh chan *model.RestoreResult, pw progress.Writer) {
tracker := &progress.Tracker{Message: fmt.Sprintf("Restore data to openGauss: %s", sn.IP)}
result := ""
in := &model.RestoreIn{
DBPort: sn.Port,
DBName: sn.Database,
Username: sn.Username,
Password: sn.Password,
Instance: defaultInstance,
DnBackupPath: BackupPath,
DnBackupID: backupID,
}
pw.AppendTracker(tracker)
if err := as.Restore(in); err != nil {
tracker.MarkAsErrored()
result = "Failed"
} else {
tracker.MarkAsDone()
result = "Completed"
}
resultCh <- &model.RestoreResult{
IP: sn.IP,
Port: sn.Port,
Status: result,
}
}