pkg/common/utils/mysql/mysql.go (176 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package mysql
import (
"database/sql"
"encoding/json"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"k8s.io/klog/v2"
)
type DBConfig struct {
User string
Password string
Host string
Port string
Database string
}
type DB struct {
*sqlx.DB
}
func NewDorisSqlDB(cfg DBConfig) (*DB, error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Database)
db, err := sqlx.Open("mysql", dsn)
if err != nil {
klog.Errorf("NewDorisSqlDB sqlx.Open failed open doris sql client connection, err: %s \n", err.Error())
return nil, err
}
if err = db.Ping(); err != nil {
klog.Errorf("NewDorisSqlDB sqlx.Open.Ping failed ping doris sql client connection, err: %s \n", err.Error())
return nil, err
}
return &DB{db}, nil
}
func NewDorisMasterSqlDB(dbConf DBConfig) (*DB, error) {
loadBalanceDBClient, err := NewDorisSqlDB(dbConf)
if err != nil {
klog.Errorf("NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error())
return nil, err
}
master, _, err := loadBalanceDBClient.GetFollowers()
if err != nil {
klog.Errorf("NewDorisMasterSqlDB GetFollowers master failed, err:%s", err.Error())
return nil, err
}
var masterDBClient *DB
if master.CurrentConnected == "Yes" {
masterDBClient = loadBalanceDBClient
} else {
// loadBalanceDBClient should be closed
defer loadBalanceDBClient.Close()
// Get the connection to the master
masterDBClient, err = NewDorisSqlDB(DBConfig{
User: dbConf.User,
Password: dbConf.Password,
Host: master.Host,
Port: dbConf.Port,
Database: "mysql",
})
if err != nil {
klog.Errorf("NewDorisMasterSqlDB failed, get fe master connection err:%s", err.Error())
return nil, err
}
}
return masterDBClient, nil
}
func (db *DB) Close() error {
return db.DB.Close()
}
func (db *DB) Exec(query string, args ...interface{}) (sql.Result, error) {
return db.DB.Exec(query, args...)
}
func (db *DB) Select(dest interface{}, query string, args ...interface{}) error {
return db.DB.Select(dest, query, args...)
}
func (db *DB) ShowFrontends() ([]*Frontend, error) {
var fes []*Frontend
err := db.Select(&fes, "show frontends")
return fes, err
}
func (db *DB) ShowBackends() ([]*Backend, error) {
var bes []*Backend
err := db.Select(&bes, "show backends")
return bes, err
}
func (db *DB) DecommissionBE(nodes []*Backend) error {
if len(nodes) == 0 {
klog.Infoln("mysql DecommissionBE BE node is empty")
return nil
}
nodesString := fmt.Sprintf(`"%s:%d"`, nodes[0].Host, nodes[0].HeartbeatPort)
for _, node := range nodes[1:] {
nodesString = nodesString + fmt.Sprintf(`,"%s:%d"`, node.Host, node.HeartbeatPort)
}
alter := fmt.Sprintf("ALTER SYSTEM DECOMMISSION BACKEND %s;", nodesString)
_, err := db.Exec(alter)
return err
}
func (db *DB) DropBE(nodes []*Backend) error {
if len(nodes) == 0 {
klog.Infoln("mysql DropBE BE node is empty")
return nil
}
nodesString := fmt.Sprintf(`"%s:%d"`, nodes[0].Host, nodes[0].HeartbeatPort)
for _, node := range nodes[1:] {
nodesString = nodesString + fmt.Sprintf(`,"%s:%d"`, node.Host, node.HeartbeatPort)
}
alter := fmt.Sprintf("ALTER SYSTEM DROPP BACKEND %s;", nodesString)
_, err := db.Exec(alter)
return err
}
func (db *DB) DropObserver(nodes []*Frontend) error {
if len(nodes) == 0 {
klog.Infoln("DropObserver observer node is empty")
return nil
}
var alter string
for _, node := range nodes {
alter = alter + fmt.Sprintf(`ALTER SYSTEM DROP OBSERVER "%s:%d";`, node.Host, node.EditLogPort)
}
_, err := db.Exec(alter)
return err
}
func (db *DB) GetObservers() ([]*Frontend, error) {
frontends, err := db.ShowFrontends()
if err != nil {
klog.Errorf("GetObservers show frontends failed, err: %s\n", err.Error())
return nil, err
}
var res []*Frontend
for _, fe := range frontends {
if fe.Role == FE_OBSERVE_ROLE {
res = append(res, fe)
}
}
return res, nil
}
func (db *DB) GetBackendsByCGName(cgName string) ([]*Backend, error) {
backends, err := db.ShowBackends()
if err != nil {
klog.Errorf("GetBackendsByCGName show backends failed, err: %s\n", err.Error())
return nil, err
}
var res []*Backend
for _, be := range backends {
var m map[string]interface{}
err := json.Unmarshal([]byte(be.Tag), &m)
if err != nil {
klog.Errorf("GetBackendsByCGName backends tag stirng to map failed, tag: %s, err: %s\n", be.Tag, err.Error())
return nil, err
}
if _, ok := m["compute_group_name"]; !ok {
klog.Errorf("GetBackendsByCGName backends tag get compute_group_name failed, tag: %s, err: %s\n", be.Tag, err.Error())
return nil, err
}
cgNameFromTag := fmt.Sprintf("%s", m["compute_group_name"])
if cgNameFromTag == cgName {
res = append(res, be)
}
}
return res, nil
}
// GetFollowers return fe master,all followers(including master) and err
func (db *DB) GetFollowers() (*Frontend, []*Frontend, error) {
frontends, err := db.ShowFrontends()
if err != nil {
klog.Errorf("GetFollowers show frontends failed, err: %s\n", err.Error())
return nil, nil, err
}
var res []*Frontend
var master *Frontend
for _, fe := range frontends {
if fe.Role == FE_FOLLOWER_ROLE {
res = append(res, fe)
if fe.IsMaster {
master = fe
}
}
}
return master, res, nil
}