state/lock.go (130 lines of code) (raw):

// Copyright (c) 2018 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package state import ( "database/sql" "time" "github.com/uber/storagetapper/log" "github.com/uber/storagetapper/util" ) func getTableTask(workerID string, lockTimeout time.Duration) (*Row, error) { tx, err := mgr.conn.Begin() if log.E(err) { return nil, err } defer func() { _ = tx.Rollback() }() var id int64 var add string if CheckMySQLVersion("8") { add = " SKIP LOCKED" } ir := util.QueryTxRowSQL(tx, `SELECT id FROM state WHERE deleted_at IS NULL AND (locked_at IS NULL OR locked_at < NOW() - INTERVAL ? SECOND) AND (input = 'mysql' OR snapshotted_at IS NULL OR need_snapshot OR (params->"$.Schedule.Interval" > 0 AND snapshotted_at + INTERVAL params->"$.Schedule.Interval" SECOND < NOW())) LIMIT 1 FOR UPDATE`+add, lockTimeout/time.Second) err = ir.Scan(&id) if err != nil && err == sql.ErrNoRows { return nil, nil // no tasks } if log.E(err) { return nil, err } r, err := util.QueryTxSQL(tx, stateSQL+` WHERE state.id=?`, id) if log.E(err) { return nil, err } srow, err := parseRows(r) if log.E(err) { return nil, err } if len(srow) == 0 { return nil, nil // no tasks } row := &srow[0] tm := time.Now().Round(time.Second) if row.TimeForSnapshot(tm) { err = advanceSnapshottedAt(tx, row.ID, tm) if log.E(err) { return nil, err } row.NeedSnapshot = true row.SnapshottedAt = tm } err = util.ExecTxSQL(tx, `UPDATE state SET locked_at=NOW(), worker_id=? WHERE id=?`, workerID, row.ID) if log.E(err) { return nil, err } err = tx.Commit() if log.E(err) { return nil, err } return row, nil } //GetTableTask picks first available task which needs attention and which lock is expired //The task is locked by the given workerID and lock time is set to current time func GetTableTask(workerID string, lockTimeout time.Duration) (r *Row, err error) { for i := 0; i < trxRetryCount; i++ { r, err = getTableTask(workerID, lockTimeout) if err == nil || !isRetriableErr(err) { return } } return } //RefreshTableLock updates the lock time of the given task and worker //Returns false if the task is not locked by given worker anymore func RefreshTableLock(stateID int64, workerID string) bool { // `IF(locked_at=NOW(),NOW()+1,NOW())` is to handle the case when we call // Refresh(Cluster/Table)Lock lock immediately after locking and locked_at is equal to NOW(). // We update it with NOW()+1 in order to still get RowsAffected() = 1 back. // This helps to avoid complex transaction logic to handle that bound case. res, err := mgr.conn.Exec(`UPDATE state SET locked_at=IF(locked_at=NOW(),NOW()+INTERVAL 1 SECOND,NOW()) WHERE id=? AND worker_id=?`, stateID, workerID) if log.E(err) { return false } n, err := res.RowsAffected() if log.E(err) { return false } if n == 1 { return true } return false } func getClusterTask(input string, workerID string, lockTimeout time.Duration) (string, string, string, error) { tx, err := mgr.conn.Begin() if log.E(err) { return "", "", "", err } defer func() { _ = tx.Rollback() }() var service, cluster, db, add string if CheckMySQLVersion("8") { add = " OF c SKIP LOCKED" } err = util.QueryTxRowSQL(tx, `SELECT DISTINCT s.service, c.cluster, s.db FROM cluster_state c join state s USING(cluster) WHERE s.deleted_at IS NULL AND s.input=? AND (c.locked_at IS NULL OR c.locked_at < NOW() - INTERVAL ? SECOND) LIMIT 1 FOR UPDATE`+add, input, lockTimeout/time.Second).Scan(&service, &cluster, &db) if err != nil && err == sql.ErrNoRows { return "", "", "", nil // no clusters } if log.E(err) { return "", "", "", err } err = util.ExecTxSQL(tx, `UPDATE cluster_state SET locked_at=NOW(), worker_id=? WHERE cluster=?`, workerID, cluster) if log.E(err) { return "", "", "", err } err = tx.Commit() if log.E(err) { return "", "", "", err } return service, cluster, db, err } //GetClusterTask pick first available cluster task and locks it func GetClusterTask(input string, workerID string, lockTimeout time.Duration) (s string, c string, d string, err error) { for i := 0; i < trxRetryCount; i++ { s, c, d, err = getClusterTask(input, workerID, lockTimeout) if err == nil || !isRetriableErr(err) { return } } return } //RefreshClusterLock updates the lock time of the given cluster and worker //Returns false if the task is not locked by given worker anymore func RefreshClusterLock(cluster string, workerID string) bool { res, err := mgr.conn.Exec(`UPDATE cluster_state SET locked_at=IF(locked_at=NOW(),NOW()+INTERVAL 1 SECOND,NOW()) WHERE cluster=? AND worker_id=?`, cluster, workerID) if log.E(err) { return false } n, err := res.RowsAffected() if log.E(err) { return false } if n == 1 { return true } return false }