pitr/cli/internal/pkg/shardingsphere-proxy.go (152 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package pkg import ( "database/sql" "encoding/base64" "encoding/json" "fmt" "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model" "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/xerr" "github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/gsutil" ) type ( shardingSphereProxy struct { db *sql.DB } IShardingSphereProxy interface { ExportMetaData() (*model.ClusterInfo, error) ExportStorageNodes() ([]*model.StorageNode, error) LockForRestore() error LockForBackup() error Unlock() error ImportMetaData(in *model.ClusterInfo) error DropDatabase(shardingDBName string) error } ) const ( DefaultDBName = "postgres" ) func NewShardingSphereProxy(user, password, dbName, host string, port uint16) (IShardingSphereProxy, error) { db, err := gsutil.Open(user, password, dbName, host, port) if err != nil { return nil, err } if err = db.Ping(); err != nil { efmt := "db ping fail[host=%s,port=%d,user=%s,pwLen=%d,dbName=%s],err=%s" return nil, fmt.Errorf(efmt, host, port, user, len(password), dbName, err) } return &shardingSphereProxy{db: db}, nil } /* LockForBackup 停写,同时锁 CSN,备份场景使用 */ func (ss *shardingSphereProxy) LockForBackup() error { _, err := ss.db.Exec(`LOCK CLUSTER WITH LOCK_STRATEGY(TYPE(NAME="WRITE", PROPERTIES("lock_csn"=true)));`) if err != nil { return xerr.NewCliErr("ss lock for backup failure") } return nil } /* LockForRestore 停读写,不需要锁 CSN,恢复场景使用 */ func (ss *shardingSphereProxy) LockForRestore() error { _, err := ss.db.Exec(`LOCK CLUSTER WITH LOCK_STRATEGY(TYPE(NAME="READ_WRITE"))`) if err != nil { return xerr.NewCliErr("ss lock for restore failure") } return nil } func (ss *shardingSphereProxy) Unlock() error { _, err := ss.db.Exec("UNLOCK CLUSTER;") if err != nil { return xerr.NewCliErr("ss unlock failure") } return nil } /* ExportMetaData 导出 SS 元数据 +-----------------------------+-------------------------+----------------------------------------+ | id | create_time | data | +-------------------------------------------------------+----------------------------------------+ | 734bb036-b15d-4af0-be87-237 | 2023-01-01 12:00:00 897 | {"meta_data":{},"snapshot_info":{}} | +-------------------------------------------------------+----------------------------------------+ */ func (ss *shardingSphereProxy) ExportMetaData() (*model.ClusterInfo, error) { query, err := ss.db.Query(`EXPORT METADATA;`) if err != nil { return nil, xerr.NewCliErr(fmt.Sprintf("export meta data failure,err=%s", err)) } var ( id string createTime string data string ) if query.Next() { if err = query.Scan(&id, &createTime, &data); err != nil { return nil, xerr.NewCliErr(fmt.Sprintf("query scan failure,err=%s", err)) } if err = query.Close(); err != nil { return nil, xerr.NewCliErr(fmt.Sprintf("query close failure,err=%s", err)) } } if query.Err() != nil { return nil, xerr.NewCliErr(fmt.Sprintf("query err=%s", query.Err())) } var out model.ClusterInfo rawDecodedText, err := base64.StdEncoding.DecodeString(data) if err != nil { return nil, fmt.Errorf("base64 decode return err=%s", err) } if err = json.Unmarshal(rawDecodedText, &out); err != nil { return nil, fmt.Errorf("json unmarshal return err=%s", err) } if out.SnapshotInfo != nil && out.SnapshotInfo.Csn == "" { out.SnapshotInfo = nil } return &out, nil } /* ExportStorageNodes 导出存储节点数据 +-----------------------------+-------------------------+--------------------------------------------+ | id | create_time | data | +-------------------------------------------------------+--------------------------------------------+ | 734bb036-b15d-4af0-be87-237 | 2023-01-01 12:00:00 897 | {"storage_nodes":{"xx_db":[],"xx2_db":[]}} | +-------------------------------------------------------+--------------------------------------------+ */ func (ss *shardingSphereProxy) ExportStorageNodes() ([]*model.StorageNode, error) { query, err := ss.db.Query(`EXPORT STORAGE NODES;`) if err != nil { return nil, xerr.NewCliErr(fmt.Sprintf("export storage nodes failure,err=%s", err)) } var ( id string createTime string data string ) if query.Next() { if err = query.Scan(&id, &createTime, &data); err != nil { return nil, xerr.NewCliErr(fmt.Sprintf("query scan failure,err=%s", err)) } if err = query.Close(); err != nil { return nil, xerr.NewCliErr(fmt.Sprintf("query close failure,err=%s", err)) } } if query.Err() != nil { return nil, xerr.NewCliErr(fmt.Sprintf("query err failure,err=%s", err)) } out := &model.StorageNodesInfo{} if err = json.Unmarshal([]byte(data), &out); err != nil { return nil, fmt.Errorf("json unmarshal return err=%s", err) } // get all storage nodes and filter duplicate nodes var storageNodes []*model.StorageNode var tmpNodesMap = make(map[string]struct{}) for _, v := range out.StorageNodes { for _, vv := range v { // filter duplicate nodes if _, ok := tmpNodesMap[fmt.Sprintf("%s:%d", vv.IP, vv.Port)]; ok { continue } tmpNodesMap[fmt.Sprintf("%s:%d", vv.IP, vv.Port)] = struct{}{} storageNodes = append(storageNodes, vv) } } return storageNodes, nil } // ImportMetaData 备份数据恢复 func (ss *shardingSphereProxy) ImportMetaData(in *model.ClusterInfo) error { if in == nil { return xerr.NewCliErr("import meta data is nil") } marshal, err := json.Marshal(in) if err != nil { return xerr.NewCliErr(fmt.Sprintf("json marshal,invalid data[in=%+v]", in)) } _, err = ss.db.Exec(fmt.Sprintf(`IMPORT METADATA '%s';`, base64.StdEncoding.EncodeToString(marshal))) if err != nil { return xerr.NewCliErr(fmt.Sprintf("import metadata failure,err=%s", err)) } return nil } // DropDatabase 删除数据库 func (ss *shardingSphereProxy) DropDatabase(databaseName string) error { _, err := ss.db.Exec(fmt.Sprintf(`DROP DATABASE %s;`, databaseName)) if err != nil { return xerr.NewCliErr(fmt.Sprintf("drop database failure, err:%s", err.Error())) } return nil }