pitr/cli/internal/cmd/backup.go (397 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cmd import ( "fmt" "os" "time" "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg" "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model" "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/xerr" "github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/logging" "github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/prettyoutput" "github.com/google/uuid" "github.com/jedib0t/go-pretty/v6/progress" "github.com/jedib0t/go-pretty/v6/table" "github.com/spf13/cobra" "github.com/spf13/pflag" "golang.org/x/sync/errgroup" ) const ( // defaultInstance is used to set backup instance name in openGauss, we can modify it in the future. defaultInstance = "ins-default-ss" // defaultShowDetailRetryTimes retry times of check backup detail from agent server defaultShowDetailRetryTimes = 3 ) var filename string var BackupCmd = &cobra.Command{ Use: "backup", Short: "Backup a database cluster", Run: func(cmd *cobra.Command, args []string) { cmd.Flags().VisitAll(func(flag *pflag.Flag) { fmt.Printf("Flag: %s Value: %s\n", flag.Name, flag.Value) }) // convert BackupModeStr to BackupMode switch BackupModeStr { case "FULL", "full": BackupMode = model.BDBackModeFull case "PTRACK", "ptrack": BackupMode = model.DBBackModePTrack } if BackupMode == model.DBBackModePTrack { logging.Warn("Please make sure all openGauss nodes have been set correct configuration about ptrack. You can refer to https://support.huaweicloud.com/intl/zh-cn/devg-opengauss/opengauss_devg_1362.html for more details.") } logging.Info(fmt.Sprintf("Default backup path: %s", pkg.DefaultRootDir())) // Start backup if err := backup(); err != nil { logging.Error(err.Error()) } }, } func init() { RootCmd.AddCommand(BackupCmd) BackupCmd.Flags().StringVarP(&Host, "host", "H", "", "ss-proxy hostname or ip") _ = BackupCmd.MarkFlagRequired("host") BackupCmd.Flags().Uint16VarP(&Port, "port", "P", 0, "ss-proxy port") _ = BackupCmd.MarkFlagRequired("port") BackupCmd.Flags().StringVarP(&Username, "username", "u", "", "ss-proxy username") _ = BackupCmd.MarkFlagRequired("username") BackupCmd.Flags().StringVarP(&Password, "password", "p", "", "ss-proxy password") _ = BackupCmd.MarkFlagRequired("password") BackupCmd.Flags().StringVarP(&BackupPath, "dn-backup-path", "B", "", "openGauss data backup path") _ = BackupCmd.MarkFlagRequired("dn-backup-path") BackupCmd.Flags().StringVarP(&BackupModeStr, "dn-backup-mode", "b", "", "openGauss data backup mode (FULL|PTRACK)") _ = BackupCmd.MarkFlagRequired("dn-backup-mode") BackupCmd.Flags().Uint8VarP(&ThreadsNum, "dn-threads-num", "j", 1, "openGauss data backup threads nums") BackupCmd.Flags().Uint16VarP(&AgentPort, "agent-port", "a", 443, "agent server port") _ = BackupCmd.MarkFlagRequired("agent-port") } // Steps of backup: // 1. lock cluster // 2. Get cluster info and save local backup info // 3. Operate backup by agent-server // 4. unlock cluster // 5. Waiting for backups finished // 6. Update local backup info // 7. Double check backups all finished // nolint:gocognit func backup() error { var err error var lsBackup *model.LsBackup proxy, err := pkg.NewShardingSphereProxy(Username, Password, pkg.DefaultDBName, Host, Port) if err != nil { return xerr.NewCliErr("Create ss-proxy connect failed") } ls, err := pkg.NewLocalStorage(pkg.DefaultRootDir()) if err != nil { return xerr.NewCliErr("Create local storage failed") } defer func() { if err != nil { logging.Warn("Try to unlock cluster ...") if err := proxy.Unlock(); err != nil { logging.Error(fmt.Sprintf("Coz backup failed, try to unlock cluster, but still failed, err:%s", err.Error())) } if lsBackup != nil { logging.Warn("Try to delete backup data ...") deleteBackupFiles(ls, lsBackup) } } }() // Step1. lock cluster logging.Info("Starting lock cluster ...") err = proxy.LockForBackup() if err != nil { return xerr.NewCliErr("Lock for backup failed") } // Step2. Get cluster info and save local backup info logging.Info("Starting export metadata ...") lsBackup, err = exportData(proxy, ls) if err != nil { return xerr.NewCliErr(fmt.Sprintf("export backup data failed, err:%s", err.Error())) } logging.Info(fmt.Sprintf("Export backup data success, backup filename: %s", filename)) // Step3. Check agent server status logging.Info("Checking agent server status...") if available := checkAgentServerStatus(lsBackup); !available { logging.Error("Cancel! One or more agent server are not available.") err = xerr.NewCliErr("One or more agent server are not available.") return err } // Step4. Show disk space logging.Info("Checking disk space...") err = checkDiskSpace(lsBackup) if err != nil { return xerr.NewCliErr(err.Error()) } // Step5. send backup command to agent-server. logging.Info("Starting backup ...") err = execBackup(lsBackup) if err != nil { return xerr.NewCliErr(fmt.Sprintf("exec backup failed, err:%s", err.Error())) } // Step6. unlock cluster logging.Info("Starting unlock cluster ...") err = proxy.Unlock() if err != nil { return xerr.NewCliErr(fmt.Sprintf("unlock cluster failed, err:%s", err.Error())) } // Step7. update backup file logging.Info("Starting update backup file ...") err = ls.WriteByJSON(filename, lsBackup) if err != nil { return xerr.NewCliErr(fmt.Sprintf("update backup file failed, err:%s", err.Error())) } // Step8. check agent server backup logging.Info("Starting check backup status ...") status := checkBackupStatus(lsBackup) logging.Info(fmt.Sprintf("Backup result: %s", status)) if status != model.SsBackupStatusCompleted && status != model.SsBackupStatusCanceled { err = xerr.NewCliErr("Backup failed") return err } // Step9. finished backup and update backup file logging.Info("Starting update backup file ...") err = ls.WriteByJSON(filename, lsBackup) if err != nil { return xerr.NewCliErr(fmt.Sprintf("update backup file failed, err: %s", err.Error())) } logging.Info("Backup finished!") return nil } func exportData(proxy pkg.IShardingSphereProxy, ls pkg.ILocalStorage) (lsBackup *model.LsBackup, err error) { // Step1. export cluster metadata from ss-proxy cluster, err := proxy.ExportMetaData() if err != nil { return nil, xerr.NewCliErr("export meta data failed") } // Step2. export storage nodes from ss-proxy nodes, err := proxy.ExportStorageNodes() if err != nil { return nil, xerr.NewCliErr("export storage nodes failed") } // Step3. combine the backup contents filename = ls.GenFilename(pkg.ExtnJSON) csn := "" if cluster.SnapshotInfo != nil { csn = cluster.SnapshotInfo.Csn } contents := &model.LsBackup{ Info: &model.BackupMetaInfo{ ID: uuid.New().String(), // generate uuid for this backup CSN: csn, StartTime: time.Now().Unix(), EndTime: 0, BackupMode: BackupMode, }, SsBackup: &model.SsBackup{ Status: model.SsBackupStatusWaiting, // default status of backup is model.SsBackupStatusWaiting ClusterInfo: cluster, StorageNodes: nodes, }, } // Step4. finally, save data with json to local if err := ls.WriteByJSON(filename, contents); err != nil { return nil, xerr.NewCliErr("write backup info by json failed") } return contents, nil } func execBackup(lsBackup *model.LsBackup) error { sNodes := lsBackup.SsBackup.StorageNodes dnCh := make(chan *model.DataNode, len(sNodes)) g := new(errgroup.Group) logging.Info("Starting send backup command to agent server...") for _, node := range sNodes { sn := node as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort)) g.Go(func() error { return _execBackup(as, sn, dnCh) }) } err := g.Wait() close(dnCh) // if backup failed, return error if err != nil { lsBackup.SsBackup.Status = model.SsBackupStatusFailed return xerr.NewCliErr(fmt.Sprintf("node backup failed, err:%s", err.Error())) } // save data node list to lsBackup for dn := range dnCh { lsBackup.DnList = append(lsBackup.DnList, dn) } lsBackup.SsBackup.Status = model.SsBackupStatusRunning return nil } func _execBackup(as pkg.IAgentServer, node *model.StorageNode, dnCh chan *model.DataNode) error { in := &model.BackupIn{ DBPort: node.Port, DBName: node.Database, Username: node.Username, Password: node.Password, DnBackupPath: BackupPath, DnThreadsNum: ThreadsNum, DnBackupMode: BackupMode, Instance: defaultInstance, } backupID, err := as.Backup(in) if err != nil { return xerr.NewCliErr(fmt.Sprintf("backup failed, err:%s", err.Error())) } // update DnList of lsBackup dn := &model.DataNode{ IP: node.IP, Port: node.Port, Status: model.SsBackupStatusRunning, BackupID: backupID, StartTime: time.Now().Unix(), EndTime: 0, } dnCh <- dn return nil } func checkBackupStatus(lsBackup *model.LsBackup) model.BackupStatus { var ( dataNodeMap = make(map[string]*model.DataNode) dnCh = make(chan *model.DataNode, len(lsBackup.DnList)) backupFinalStatus = model.SsBackupStatusCompleted totalNum = len(lsBackup.SsBackup.StorageNodes) dnResult = make([]*model.DataNode, 0) ) if totalNum == 0 { logging.Info("No data node need to backup") return model.SsBackupStatusCanceled } for _, dn := range lsBackup.DnList { dataNodeMap[dn.IP] = dn } pw := prettyoutput.NewPW(totalNum) go pw.Render() for idx := 0; idx < totalNum; idx++ { sn := lsBackup.SsBackup.StorageNodes[idx] as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort)) dn := dataNodeMap[sn.IP] go checkStatus(as, sn, dn, dnCh, pw) } // wait for all data node backup finished time.Sleep(time.Millisecond * 100) for pw.IsRenderInProgress() { time.Sleep(time.Millisecond * 100) } close(dnCh) for dn := range dnCh { dnResult = append(dnResult, dn) if dn.Status != model.SsBackupStatusCompleted { backupFinalStatus = model.SsBackupStatusFailed } } // print backup result formatted t := table.NewWriter() t.SetOutputMirror(os.Stdout) t.SetTitle("Backup Task Result: %s", backupFinalStatus) t.AppendHeader(table.Row{"#", "Data Node IP", "Data Node Port", "Result"}) for i, dn := range dnResult { t.AppendRow([]interface{}{i + 1, dn.IP, dn.Port, dn.Status}) t.AppendSeparator() } t.Render() lsBackup.DnList = dnResult lsBackup.SsBackup.Status = backupFinalStatus lsBackup.Info.EndTime = time.Now().Unix() return backupFinalStatus } func checkStatus(as pkg.IAgentServer, sn *model.StorageNode, dn *model.DataNode, dnCh chan *model.DataNode, pw progress.Writer) { var ( // mark check status is done, time ticker should break. done = make(chan struct{}) // time ticker, try to doCheck request every 2 seconds. ticker = time.Tick(time.Second * 2) // progress bar. tracker = progress.Tracker{Message: fmt.Sprintf("Checking backup status # %s:%d", sn.IP, sn.Port), Total: 0, Units: progress.UnitsDefault} ) pw.AppendTracker(&tracker) for !tracker.IsDone() { select { case <-done: return case <-ticker: status, err := doCheck(as, sn, dn.BackupID, defaultShowDetailRetryTimes) if err != nil { tracker.MarkAsErrored() dn.Status = status dn.EndTime = time.Now().Unix() dnCh <- dn done <- struct{}{} } if status == model.SsBackupStatusCompleted || status == model.SsBackupStatusFailed { tracker.MarkAsDone() dn.Status = status dn.EndTime = time.Now().Unix() dnCh <- dn done <- struct{}{} } } } } func doCheck(as pkg.IAgentServer, sn *model.StorageNode, backupID string, retries int) (status model.BackupStatus, err error) { in := &model.ShowDetailIn{ DBPort: sn.Port, DBName: sn.Database, Username: sn.Username, Password: sn.Password, DnBackupID: backupID, DnBackupPath: BackupPath, Instance: defaultInstance, } backupInfo, err := as.ShowDetail(in) if err != nil { if retries == 0 { return model.SsBackupStatusCheckError, err } time.Sleep(time.Second * 1) return doCheck(as, sn, backupID, retries-1) } return backupInfo.Status, nil } func deleteBackupFiles(ls pkg.ILocalStorage, lsBackup *model.LsBackup) { var ( dataNodeMap = make(map[string]*model.DataNode) totalNum = len(lsBackup.SsBackup.StorageNodes) resultCh = make(chan *model.DeleteBackupResult, totalNum) ) for _, dn := range lsBackup.DnList { dataNodeMap[dn.IP] = dn } if totalNum == 0 { logging.Info("No data node need to delete backup files") return } pw := prettyoutput.NewPW(totalNum) go pw.Render() for _, sn := range lsBackup.SsBackup.StorageNodes { sn := sn dn, ok := dataNodeMap[sn.IP] if !ok { logging.Warn(fmt.Sprintf("SKIPPED! data node %s:%d not found in backup info.", sn.IP, sn.Port)) continue } as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort)) go doDelete(as, sn, dn, resultCh, pw) } time.Sleep(time.Millisecond * 100) for pw.IsRenderInProgress() { if pw.LengthActive() == 0 { pw.Stop() } time.Sleep(time.Millisecond * 100) } close(resultCh) t := table.NewWriter() t.SetOutputMirror(os.Stdout) t.SetTitle("Delete Backup Files Result") t.AppendHeader(table.Row{"#", "Node IP", "Node Port", "Result", "Message"}) t.SetColumnConfigs([]table.ColumnConfig{{Number: 5, WidthMax: 50}}) idx := 0 for result := range resultCh { idx++ t.AppendRow([]interface{}{idx, result.IP, result.Port, result.Status, result.Msg}) t.AppendSeparator() } t.Render() if err := ls.DeleteByName(filename); err != nil { logging.Warn("Delete backup info file failed") } logging.Info("Delete backup files finished") } func doDelete(as pkg.IAgentServer, sn *model.StorageNode, dn *model.DataNode, resultCh chan *model.DeleteBackupResult, pw progress.Writer) { var ( tracker = progress.Tracker{Message: fmt.Sprintf("Deleting backup files # %s:%d", sn.IP, sn.Port), Total: 0, Units: progress.UnitsDefault} ) pw.AppendTracker(&tracker) in := &model.DeleteBackupIn{ DBPort: sn.Port, DBName: sn.Database, Username: sn.Username, Password: sn.Password, DnBackupPath: BackupPath, BackupID: dn.BackupID, Instance: defaultInstance, } r := &model.DeleteBackupResult{ IP: sn.IP, Port: sn.Port, } if err := as.DeleteBackup(in); err != nil { r.Status = model.SsBackupStatusFailed r.Msg = err.Error() resultCh <- r tracker.MarkAsErrored() } else { tracker.MarkAsDone() r.Status = model.SsBackupStatusCompleted resultCh <- r } }