state/state.go (655 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package state
import (
"database/sql"
"encoding/json"
"fmt"
"strconv"
"strings"
"sync/atomic"
"time"
gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/pkg/errors"
"github.com/uber/storagetapper/config"
"github.com/uber/storagetapper/db"
"github.com/uber/storagetapper/log"
"github.com/uber/storagetapper/schema"
"github.com/uber/storagetapper/types"
"github.com/uber/storagetapper/util"
)
// All the fields needed to be fetched from the state tables to prepare in-memory representation of state
const (
stateSQL = "SELECT state.id, state.cluster, state.service, state.db, state.table_name, " +
"state.input, state.output, state.output_format, state.version, cluster_state.gtid, cluster_state.seqno, cluster_state.updated_at cs_updated_at, " +
"raw_schema.schema_gtid, raw_schema.raw_schema, state.snapshotted_at, state.deleted_at, state.need_snapshot, state.params " +
"FROM state INNER JOIN cluster_state USING(cluster) INNER JOIN raw_schema ON raw_schema.state_id = state.id"
regSQL = "SELECT cluster, service, db, table_name, input, output, output_format, version, snapshotted_at, need_snapshot, created_at, updated_at, params FROM registrations WHERE deleted_at IS NULL"
trxRetryCount = 5
)
//Row represents a row in the state table
type Row struct {
types.TableLoc
ID int64
OutputFormat string
Gtid string
GtidUpdatedAt time.Time
SeqNo uint64
SchemaGtid string
RawSchema string
SnapshottedAt time.Time
CreatedAt time.Time
UpdatedAt time.Time
NeedSnapshot bool
Deleted bool
ParamsRaw string
Params *config.TableParams
}
//Type is in-memory representation of state
type Type []Row
// TimeForSnapshot determines if row requires snapshot to be taken
func (r *Row) TimeForSnapshot(now time.Time) bool {
return r.SnapshottedAt.IsZero() || r.Params.Schedule.Interval != 0 && r.SnapshottedAt.Add(r.Params.Schedule.Interval).Before(now)
}
// SnapshotTimeChanged determines if row has SnapshottedAt updated
func (r *Row) SnapshotTimeChanged(prev time.Time) bool {
return prev.Round(time.Second) != r.SnapshottedAt
}
func tableLocLog(t *types.TableLoc) log.Logger {
return log.WithFields(log.Fields{"service": t.Service, "cluster": t.Cluster,
"db": t.DB, "table": t.Table, "input": t.Input, "output": t.Output,
"version": t.Version})
}
//GetDB returns active db connection to the state
func GetDB() *sql.DB {
return mgr.conn
}
//GetDBAddr return low level address of the database: Host, Port, User, Password
func GetDBAddr() *db.Addr {
return mgr.dbAddr
}
//GetNoDB returns active connection to the state database server
//Without connecting to any specific db
func GetNoDB() *sql.DB {
return mgr.nodbconn
}
func readTableParams(rp sql.NullString, r *Row) error {
r.ParamsRaw = ""
if rp.Valid {
r.ParamsRaw = rp.String
}
r.Params = &config.Get().TableParams //point to default
if r.ParamsRaw != "" && r.ParamsRaw != "{}" {
tp := config.Get().TableParams.CopyForMerge()
if err := json.Unmarshal([]byte(r.ParamsRaw), &tp); err != nil {
return err
}
tp.MergeCompound(r.Params)
tp.Schedule.Interval *= time.Second
r.Params = tp
}
return nil
}
func parseRows(rows *sql.Rows) (Type, error) {
defer func() { log.E(rows.Close()) }()
res := make(Type, 0)
var r Row
for rows.Next() {
var rp sql.NullString
var sn, del, guat sql.NullTime
if err := rows.Scan(&r.ID, &r.Cluster, &r.Service, &r.DB, &r.Table, &r.Input, &r.Output, &r.OutputFormat, &r.Version, &r.Gtid,
&r.SeqNo, &guat, &r.SchemaGtid, &r.RawSchema, &sn, &del, &r.NeedSnapshot, &rp); err != nil {
return nil, err
}
r.SnapshottedAt = time.Time{}
if sn.Valid {
r.SnapshottedAt = sn.Time
}
r.GtidUpdatedAt = time.Time{}
if guat.Valid {
r.GtidUpdatedAt = guat.Time
}
r.Deleted = false
if del.Valid {
r.Deleted = true
}
if err := readTableParams(rp, &r); err != nil {
return nil, err
}
res = append(res, r)
}
if err := rows.Err(); err != nil {
return nil, err
}
return res, nil
}
func parseRegRows(rows *sql.Rows) (Type, error) {
defer func() { log.E(rows.Close()) }()
res := make(Type, 0)
var r Row
var sa, ca, ua sql.NullTime
var rp sql.NullString
var ns int
for rows.Next() {
if err := rows.Scan(&r.Cluster, &r.Service, &r.DB, &r.Table, &r.Input, &r.Output, &r.OutputFormat, &r.Version, &sa, &ns, &ca, &ua, &rp); err != nil {
return nil, err
}
r.NeedSnapshot = false
if ns > 0 {
r.NeedSnapshot = true
}
r.SnapshottedAt = time.Time{}
if sa.Valid {
r.SnapshottedAt = sa.Time
}
if ca.Valid {
r.CreatedAt = ca.Time
}
if ua.Valid {
r.UpdatedAt = ua.Time
}
if err := readTableParams(rp, &r); err != nil {
return nil, err
}
res = append(res, r)
}
if err := rows.Err(); err != nil {
return nil, err
}
return res, nil
}
//GetCondLow returns state rows with given condition in the state. Allows to
//select rows with deleted flag set
func GetCondLow(deleted bool, cond string, args ...interface{}) (Type, error) {
if len(cond) != 0 {
cond = " AND " + cond
}
sd := "IS"
if deleted {
sd = "IS NOT"
}
rows, err := util.QuerySQL(mgr.conn, stateSQL+" WHERE deleted_at "+sd+" NULL "+cond, args...)
if err != nil {
return nil, err
}
return parseRows(rows)
}
//GetCond returns state rows with given condition in the state, rows with deleted flags are ignored
func GetCond(cond string, args ...interface{}) (Type, error) {
return GetCondLow(false, cond, args...)
}
//GetRegCond returns state rows with given condition in the state, rows with deleted flags are ignored
func GetRegCond(cond string, args ...interface{}) (Type, error) {
if len(cond) != 0 && !strings.HasPrefix(cond, " LIMIT") {
cond = " AND " + cond
}
rows, err := util.QuerySQL(mgr.conn, regSQL+cond, args...)
if err != nil {
return nil, err
}
return parseRegRows(rows)
}
//GetForCluster returns state rows for given cluster name
func GetForCluster(cluster string) (Type, error) {
return GetCond("cluster=?", cluster)
}
//GetTable returns state rows for given service,cluster,db,table,input
func GetTable(service, cluster, db, table, input string, output string, version int) (*Row, error) {
st, err := GetCond("service=? AND cluster=? AND db=? AND table_name=? AND input=? AND output=? AND version=?", service, cluster, db, table, input, output, version)
if err != nil {
return nil, err
}
if len(st) < 1 {
return nil, nil
}
return &st[0], nil
}
//Get returns all the rows in the state corresponding to non-deleted tables
func Get() (Type, error) {
return GetCond("")
}
//GetCount returns number of rows in the state
func GetCount(includeDeleted bool) (int, error) {
query := "SELECT count(*) AS count FROM state"
if !includeDeleted {
query += " WHERE deleted_at IS NULL"
}
var cnt int
err := util.QueryRowSQL(mgr.conn, query).Scan(&cnt)
if err != nil && err != sql.ErrNoRows {
return 0, err
}
return cnt, nil
}
//GetTableByID return state row for the given table id
func GetTableByID(id int64) (*Row, error) {
rows, err := util.QuerySQL(mgr.conn, stateSQL+" WHERE state.id=? AND deleted_at IS NULL", id)
if err != nil {
return nil, err
}
tblRows, err := parseRows(rows)
if err != nil {
return nil, err
}
if len(tblRows) == 0 {
return nil, nil
}
return &tblRows[0], nil
}
//GetGTID returns GTID saved in the state for given db locator
func GetGTID(cluster string) (gtid string, seqno int64, err error) {
// Get first non empty gtid for the cluster
err = util.QueryRowSQL(mgr.conn, "SELECT gtid, seqno FROM cluster_state WHERE cluster=?", cluster).Scan(>id, &seqno)
return
}
//SetGTID saves given gtid for given db locator
func SetGTID(cluster, gtid string) error {
return util.ExecSQL(mgr.conn, "UPDATE cluster_state SET gtid=? WHERE cluster=?", gtid, cluster)
}
//SaveBinlogState saves current state of the binlog reader to the state DB.
//Binlog state is current GTID set and current seqNo
func SaveBinlogState(cluster, gtid string, seqNo uint64) error {
return util.ExecSQL(mgr.conn, "UPDATE cluster_state SET gtid=?, seqno=? WHERE cluster=?", gtid, seqNo, cluster)
}
//GetNeedSnapshotFlag returns need_snapshot flag saved in the STATE
func GetNeedSnapshotFlag(id int64) (ns bool, err error) {
err = util.QueryRowSQL(mgr.conn, "SELECT need_snapshot FROM state WHERE id=?", id).Scan(&ns)
return ns, err
}
func getNeedSnapshotFlag(tx *sql.Tx, id int64) (regid int64, ns bool, err error) {
err = util.QueryTxRowSQL(tx, "SELECT reg_id, need_snapshot FROM state WHERE id=? FOR UPDATE", id).Scan(®id, &ns)
return
}
func clearNeedSnapshotFlag(tx *sql.Tx, id int64, ts time.Time) error {
regid, ns, err := getNeedSnapshotFlag(tx, id)
if err != nil {
return err
}
err = util.ExecTxSQL(tx, "UPDATE state SET need_snapshot=FALSE WHERE id=? AND snapshotted_at=?", id, ts)
if err != nil {
return err
}
sub := 0
if ns { //Subtract from registrations only if we changed state value
sub = 1
}
return util.ExecTxSQL(tx, "UPDATE registrations SET need_snapshot=IFNULL(need_snapshot,0)-? WHERE id=?", sub, regid)
}
func undeleteStateRow(tx *sql.Tx, id int64, outputFormat, params string) error {
err := util.ExecTxSQL(tx, "UPDATE state SET snapshotted_at=NULL,need_snapshot=FALSE,deleted_at=NULL,output_format=?,params=? WHERE id=?", outputFormat, params, id)
return err
}
func deleteStateRow(tx *sql.Tx, t *types.TableLoc) error {
err := util.ExecTxSQL(tx, "UPDATE state SET deleted_at=CURRENT_TIMESTAMP WHERE service=? AND cluster=? AND db=? AND table_name=? AND input=? AND output=? AND version=?", t.Service, t.Cluster, t.DB, t.Table, t.Input, t.Output, t.Version)
return err
}
func advanceSnapshottedAt(tx *sql.Tx, id int64, start time.Time) error {
regid, ns, err := getNeedSnapshotFlag(tx, id)
if err != nil {
return err
}
err = util.ExecTxSQL(tx, "UPDATE state SET snapshotted_at=?, need_snapshot=TRUE WHERE id=? AND (snapshotted_at IS NULL OR snapshotted_at < ?)", start, id, start)
if err != nil {
return err
}
add := 0
if !ns { //increase values in regs only if we increased it in state
add = 1
}
return util.ExecTxSQL(tx, "UPDATE registrations SET snapshotted_at=?, need_snapshot=IFNULL(need_snapshot,0)+? WHERE id=? AND (snapshotted_at IS NULL OR snapshotted_at <= ?)", start, add, regid, start)
}
func updateSnapshottedAt(id int64, start time.Time) error {
tx, err := mgr.conn.Begin()
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }()
err = advanceSnapshottedAt(tx, id, start)
if err != nil {
return err
}
return tx.Commit()
}
//UpdateSnapshottedAt advances snapshotted_at to be no older then TableParams.Schedule.Interval from
//now
func UpdateSnapshottedAt(row *Row, tm time.Time) (*Row, error) {
tm = tm.Round(time.Second)
if row.TimeForSnapshot(tm) {
err := updateSnapshottedAt(row.ID, tm)
if log.E(err) {
return nil, err
}
row, err = GetTableByID(row.ID)
if log.E(err) {
return nil, err
}
tm = time.Now().Round(time.Second)
}
return row, nil
}
//ClearNeedSnapshot clears flag indicating that streamer has taken a snapshot of table
func ClearNeedSnapshot(id int64, ts time.Time) error {
tx, err := mgr.conn.Begin()
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }()
err = clearNeedSnapshotFlag(tx, id, ts)
if err != nil {
return err
}
return tx.Commit()
}
//GetSchema returns structured schema saved in the state for give table
func GetSchema(svc, sdb, table, input string, output string, version int) (*types.TableSchema, error) {
query := "SELECT column_name, ordinal_position, is_nullable, data_type, " +
"character_maximum_length, numeric_precision, numeric_scale, column_type, " +
"column_key from columns " +
"INNER JOIN state ON state.id = columns.state_id " +
"WHERE state.deleted_at is null AND state.db = ? AND state.table_name = ? AND state.service = ? AND state.input = ? AND state.output = ? AND state.version = ? " +
"GROUP BY service, db, table_name, input, output, version, column_name " +
"ORDER BY ordinal_position"
rows, err := mgr.conn.Query(query, sdb, table, svc, input, output, version)
if err != nil {
log.E(errors.Wrap(err, fmt.Sprintf("Error fetching column information for svc: %v, db: %v, table: %v",
svc, sdb, table)))
return nil, err
}
defer func() { log.E(rows.Close()) }()
return schema.ParseColumnInfo(rows, sdb, table)
}
func getID(tx *sql.Tx, t *types.TableLoc) (id int64, schemaGTID string, needSnapshot bool, err error) {
err = tx.QueryRow("SELECT id, need_snapshot, schema_gtid FROM state INNER JOIN raw_schema ON raw_schema.state_id = state.id "+
"WHERE service=? AND cluster=? AND db=? AND table_name=? AND input=? AND output=? "+
"AND version=? FOR UPDATE", t.Service, t.Cluster, t.DB, t.Table, t.Input, t.Output, t.Version).Scan(&id, &needSnapshot, &schemaGTID)
if log.E(err) {
return 0, "", false, err
}
return id, schemaGTID, needSnapshot, nil
}
func replaceStructuredSchema(tx *sql.Tx, id int64, s *types.TableSchema) error {
if _, err := tx.Exec("DELETE FROM columns WHERE state_id=?", id); log.E(err) {
return err
}
for _, c := range s.Columns {
if _, err := tx.Exec("INSERT INTO columns(state_id,column_name,ordinal_position,is_nullable,data_type,character_maximum_length,numeric_precision,numeric_scale,column_type,column_key) VALUES (?,?,?,?,?,?,?,?,?,?)", id, c.Name,
c.OrdinalPosition, c.IsNullable, c.DataType, c.CharacterMaximumLength, c.NumericPrecision, c.NumericScale,
c.Type, c.Key); log.E(err) {
return err
}
}
return nil
}
func replaceRawSchema(tx *sql.Tx, oldGTID string, newGTID string, t *types.TableLoc, rawSchema string, s *types.TableSchema) (int64, error) {
log.Debugf("Replacing raw schema for table %+v input=%v output=%v version=%v", t.Table, t.Input, t.Output, t.Version)
var stateGTID string
var id int64
id, stateGTID, _, err := getID(tx, t)
if err != nil {
return 0, err
}
if oldGTID != "" && stateGTID != oldGTID {
// This can happen because of the race condition between concurrent binlog
// readers from the same master DB. This is just means that we are behind of him
// and that's ok to skip persisting the schema, so as this version has been
// persisted already by concurrent binlog reader earlier
tableLocLog(t).WithFields(log.Fields{"current_gtid": oldGTID, "new_gtid": newGTID, "state_gtid": stateGTID}).Warnf("Newer schema version found in the state")
return id, fmt.Errorf("newer schema found in state")
}
if _, err := tx.Exec("UPDATE raw_schema SET raw_schema=?, schema_gtid=? WHERE state_id=?", rawSchema, newGTID, id); log.E(err) {
return 0, err
}
return id, nil
}
func clearClusterState(tx *sql.Tx, cluster string) error {
if _, err := tx.Exec("UPDATE cluster_state SET gtid='' WHERE cluster=?", cluster); log.E(err) {
return err
}
log.Infof("Old state cleared for cluster: %v", cluster)
return nil
}
func insertNewSchema(tx *sql.Tx, newGTID string, t *types.TableLoc, regid int64, format, params, rawSchema string, s *types.TableSchema) (int64, error) {
log.Debugf("Inserting schema for table %+v", t)
if params == "" {
params = "{}"
}
var cnt int64
//Guarantee that row exists for given cluster
if err := util.ExecSQL(mgr.conn, "INSERT IGNORE INTO cluster_state(cluster,gtid) VALUES (?,'')", t.Cluster); log.E(err) {
return 0, err
}
//Lock the cluster row
var unused string
if err := tx.QueryRow("SELECT gtid FROM cluster_state WHERE cluster=? FOR UPDATE", t.Cluster).Scan(&unused); log.E(err) {
return 0, err
}
if err := tx.QueryRow("SELECT COUNT(*) FROM state WHERE cluster=? AND deleted_at IS NULL", t.Cluster).Scan(&cnt); log.E(err) {
return 0, err
}
//First table re-registered for the cluster clear old state
if cnt == 0 {
if err := clearClusterState(tx, t.Cluster); err != nil {
return 0, err
}
}
insRes, err := tx.Exec("INSERT INTO state (reg_id,service,cluster,db,table_name,input,output,version,output_format,params) VALUES (?,?,?,?,?,?,?,?,?,?)", regid, t.Service, t.Cluster, t.DB, t.Table, t.Input, t.Output, t.Version, format, params)
if err != nil {
if !isDuplicateKeyErr(err) {
log.E(err)
return 0, err
}
//This is only safe todo when changelog reader is catched up
id, err := replaceRawSchema(tx, "", newGTID, t, rawSchema, s)
if err != nil {
return 0, err
}
if err = undeleteStateRow(tx, id, format, params); log.E(err) {
return 0, err
}
return id, nil
}
id, err := insRes.LastInsertId()
if log.E(err) {
return 0, err
}
if _, err = tx.Exec("INSERT INTO raw_schema(state_id,schema_gtid,raw_schema) VALUES (?,?,?)",
id, newGTID, rawSchema); log.E(err) {
return 0, err
}
return id, nil
}
//replaceSchema replaces both structured and raw schema definitions saved in the
//state with new versions provided as parameters. If old GTID is empty adds it as new table to the state
func replaceSchemaLow(svc, cluster string, s *types.TableSchema, regid int64, rawSchema, oldGTID, newGTID, input, output string, version int, format string, params string) error {
t := &types.TableLoc{Service: svc, Cluster: cluster, DB: s.DBName, Table: s.TableName, Input: input, Output: output, Version: version}
tx, err := mgr.conn.Begin()
if log.E(err) {
return err
}
defer func() { _ = tx.Rollback() }() // noop if already committed
var id int64
if oldGTID != "" { //Changelog reader sees ALTER statement
if id, err = replaceRawSchema(tx, oldGTID, newGTID, t, rawSchema, s); err != nil {
return err
}
} else { //Table registered by API
if id, err = insertNewSchema(tx, newGTID, t, regid, format, params, rawSchema, s); err != nil {
return err
}
}
if err = replaceStructuredSchema(tx, id, s); err != nil {
return err
}
if err = tx.Commit(); log.E(err) {
return err
}
tableLocLog(t).Debugf("Updated schema version from=%v, to=%v", oldGTID, newGTID)
return nil
}
func replaceSchema(svc, cluster string, s *types.TableSchema, regid int64, rawSchema, oldGTID, newGTID, input, output string, version int, format string, params string) bool {
for i := 0; i < trxRetryCount; i++ {
err := replaceSchemaLow(svc, cluster, s, regid, rawSchema, oldGTID, newGTID, input, output, version, format, params)
if err == nil {
return true
}
if !isRetriableErr(err) {
return false
}
}
return false
}
//insertStateRow inserts new state row, updates and undelete if the row already
//exists in state
//used in new rable registration and updating existing through API
func insertStateRow(svc, cluster string, s *types.TableSchema, regid int64, rawSchema, oldGTID, newGTID, input, output string, version int, format string, params string) bool {
return replaceSchema(svc, cluster, s, regid, rawSchema, oldGTID, newGTID, input, output, version, format, params)
}
//ReplaceSchema replaces both structured and raw schema definitions saved in the
//state with new versions provided as parameters
//Called from changelog reader on ALTER table statement
func ReplaceSchema(svc, cluster string, s *types.TableSchema, rawSchema, oldGTID, newGTID, input, output string, version int, format string, params string) bool {
return replaceSchema(svc, cluster, s, 0, rawSchema, oldGTID, newGTID, input, output, version, format, params)
}
//TableRegistered checks if given table is listed in registrations table
func TableRegistered(svc, cluster, sdb, table, input, output string, version int, includeDeleted bool) (bool, error) {
cluster, svc, sdb, table = SanitizeRegParams(cluster, svc, sdb, table, input)
query := "SELECT COUNT(*) FROM registrations WHERE service=? AND cluster=? AND db=? AND table_name=? " +
"AND input=? AND output=? AND version=?"
if !includeDeleted {
query += " AND deleted_at IS NULL"
}
var idRes int64
if err := util.QueryRowSQL(mgr.conn, query, svc, cluster, sdb, table, input, output, version).Scan(&idRes); err != nil {
return false, err
}
return idRes > 0, nil
}
//RegisterTable adds table for registration. The entry here is subsequently processed later on to update the state
//table
func RegisterTable(cluster, svc, sdb, table, input, output string, version int, outputFormat string, params string) bool {
cluster, svc, sdb, table = SanitizeRegParams(cluster, svc, sdb, table, input)
if params == "" {
params = "{}"
}
if !ValidateRegistration(svc, sdb, table, input, output, version) {
return false
}
err := util.ExecSQL(mgr.conn, "INSERT INTO registrations(cluster, service, db, table_name, input, output, version, output_format, params) "+
"VALUES(?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE deleted_at=NULL,snapshotted_at=NULL,need_snapshot=0,output_format=?,params=?,sync_state=?", cluster, svc, sdb, table, input, output, version, outputFormat, params, outputFormat, params, regStateUnsynced)
if log.E(err) {
return false
}
log.Infof("Registration added: cluster:%v service:%v db:%v tableName:%v, input:%v, output:%v, version:%v, format:%v, params:%v",
cluster, svc, sdb, table, input, output, version, outputFormat, params)
return true
}
//DeregisterTable removes given table from the state
func DeregisterTable(cluster, svc, sdb, table, input, output string, version int) bool {
cluster, svc, sdb, table = SanitizeRegParams(cluster, svc, sdb, table, input)
if !ValidateRegistration(svc, sdb, table, input, output, version) {
return false
}
var args = make([]interface{}, 0)
args = append(args, regStateUnsynced)
var cond string
cond, args = AddSQLCond(cond, args, "AND", "cluster", "=", cluster)
cond, args = AddSQLCond(cond, args, "AND", "service", "=", svc)
cond, args = AddSQLCond(cond, args, "AND", "db", "=", sdb)
cond, args = AddSQLCond(cond, args, "AND", "table_name", "=", table)
cond, args = AddSQLCond(cond, args, "AND", "input", "=", input)
cond, args = AddSQLCond(cond, args, "AND", "output", "=", output)
cond, args = AddSQLCond(cond, args, "AND", "version", "=", strconv.Itoa(version))
if len(cond) > 0 {
cond = " AND " + cond
}
if log.E(util.ExecSQL(mgr.conn, "UPDATE registrations SET deleted_at=CURRENT_TIMESTAMP, sync_state=? WHERE 1=1"+cond, args...)) {
return false
}
log.Infof("Deregisteration successful, service: %v, cluster: %v, DB: %v, table: %v, input: %v, output: %v, version: %v",
svc, cluster, sdb, table, input, output, version)
return true
}
//ValidateRegistration validates the registration request
func ValidateRegistration(svc, sdb, table, input, output string, version int) bool {
if input != types.InputMySQL {
log.Errorf("Incorrect input type provided: %v", input)
return false
}
if svc == "" {
log.Errorf("Service name not provided or incorrect service: %v", svc)
return false
}
if output == "" {
log.Errorf("Incorrect output type provided: %v", output)
return false
}
switch input {
case types.InputMySQL:
if sdb == "" {
log.Errorf("DB name not provided or incorrect for input: %v, db: %v", types.InputMySQL, sdb)
return false
}
}
log.Debugf("Registration request valid for svc: %v, db: %v, table: %v, input: %v, version: %v", svc, sdb,
table, input, version)
return true
}
//SanitizeRegParams sanitizes the registration options
func SanitizeRegParams(cluster, svc, sdb, table, input string) (string, string, string, string) {
cluster = strings.Trim(cluster, " *")
svc = strings.Trim(svc, " *")
sdb = strings.Trim(sdb, " *")
table = strings.Trim(table, " *")
return cluster, svc, sdb, table
}
//TableRegisteredInState checks if table with given id is still registered in the state
//ID is id field from row structure
func TableRegisteredInState(id int64) (bool, error) {
var idRes sql.NullInt64
err := util.QueryRowSQL(mgr.conn, "SELECT id FROM state WHERE id=? AND deleted_at IS NULL", id).Scan(&idRes)
if err != nil || !idRes.Valid || idRes.Int64 != id {
if err == sql.ErrNoRows {
return false, nil
}
return false, err
}
return true, nil
}
//PullCurrentSchema pulls current table schema from the source MySQL cluster
func PullCurrentSchema(dbl *db.Loc, table, input string) (*types.TableSchema, string) {
ts, err := schema.Get(dbl, table, input)
if log.E(err) {
return nil, ""
}
rawSchema, err := schema.GetRaw(dbl, "`"+dbl.Name+"`.`"+table+"`", input)
if log.E(err) {
return nil, ""
}
return ts, rawSchema
}
//RegisterTableInState adds table to the state
func RegisterTableInState(dbl *db.Loc, table, input, output string, version int, format string, params string, regid int64) bool {
sgtid, err := db.GetCurrentGTIDForDB(dbl, input)
if log.E(err) {
return false
}
ts, rawSchema := PullCurrentSchema(dbl, table, input)
if ts == nil {
return false
}
if !insertStateRow(dbl.Service, dbl.Cluster, ts, regid, rawSchema, "", sgtid, input, output, version, format, params) {
return false
}
log.Infof("Registered table: %v, %v, %v, %v, %v, %v, %v", dbl.Service, dbl.Name, table, input, output,
version, format)
return true
}
//DeregisterTableFromState removes given table from the state
func DeregisterTableFromState(dbl *db.Loc, table, input, output string, version int, regid int64) bool {
tx, err := mgr.conn.Begin()
if log.E(err) {
return false
}
defer func() { _ = tx.Rollback() }()
err = deleteStateRow(tx, &types.TableLoc{Service: dbl.Service, Cluster: dbl.Cluster, DB: dbl.Name, Table: table, Input: input, Output: output, Version: version})
if log.E(err) {
return false
}
if err := tx.Commit(); log.E(err) {
return false
}
return true
}
//Close uninitializes the state
func Close() error {
log.Debugf("DB uninitialized")
err := mgr.conn.Close()
log.E(err)
return err
}
// GetServerTimestamp fetches the current unix timestamp on the MySQL server
func GetServerTimestamp() (int64, error) {
var res int64
if err := util.QueryRowSQL(mgr.conn, "SELECT UNIX_TIMESTAMP()").Scan(&res); err != nil {
return 0, err
}
return res, nil
}
// AddSQLCond is a helper function that helps with creating conditional queries over state tables
func AddSQLCond(cond string, args []interface{}, lop string, name string, op string, val string) (string, []interface{}) {
if len(val) != 0 && val != "*" {
if len(cond) != 0 && !strings.HasSuffix(cond, "(") {
cond += lop + " "
}
cond += name + " " + op + " " + "? "
args = append(args, val)
}
return cond, args
}
func isDuplicateKeyErr(err error) bool {
return util.MySQLError(err, gmysql.ER_DUP_ENTRY)
}
func isRetriableErr(err error) bool {
return util.MySQLError(err, gmysql.ER_LOCK_DEADLOCK) || util.MySQLError(err, gmysql.ER_LOCK_WAIT_TIMEOUT)
}
// TableMaxVersion returns maximum version for the table
func TableMaxVersion(svc, cluster, sdb, table, input, output string) (int, error) {
cluster, svc, sdb, table = SanitizeRegParams(cluster, svc, sdb, table, input)
query := "SELECT MAX(version) FROM registrations WHERE service=? AND cluster=? AND db=? AND table_name=? AND input=? AND output=?"
var max int
if err := util.QueryRowSQL(mgr.conn, query, svc, cluster, sdb, table, input, output).Scan(&max); err != nil {
return 0, err
}
return max, nil
}
var cachedStateMySQLVersion atomic.Value
// CheckMySQLVersion check is state db cluster is of given version
func CheckMySQLVersion(expected string) bool {
version, _ := cachedStateMySQLVersion.Load().(string)
if version == "" {
err := GetNoDB().QueryRow("SELECT @@global.version").Scan(&version)
if log.E(err) {
return false
}
cachedStateMySQLVersion.Store(version)
}
return strings.HasPrefix(version, expected)
}