lock/lock.go (158 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
/*Package lock provides distributed lock interface.
Implementations include:
* MySQL GET_LOCK() based distributed locking
*/
package lock
import (
"database/sql"
"fmt"
"sync"
"time"
"github.com/pkg/errors"
"github.com/uber/storagetapper/db"
"github.com/uber/storagetapper/log"
"github.com/uber/storagetapper/types"
)
/*Lock is general distributed lock interface*/
type Lock interface {
// Try to acquire a lock. Returns false if failed.
TryLock(s string) bool
// Try to acquire a lock. Returns false if failed.
// Allows n simultaneous locks to be held
TryLockShared(s string, n int) bool
// Try to acquire a lock and wait for specified period of time for the lock
// to become available. Returns false if failed.
Lock(s string, waitDuration time.Duration) bool
// Check if we still have the lock. Try to reacquire if necessary.
// Returns false in the case of failure
Refresh() bool
// Unlock the lock. Returns false if there was failure
Unlock() bool
//Close releases resources associated with the lock
Close() bool
}
//TODO: Improve performance of sharded lock. Now we linearly try to take the
//lock for tickets from 0 to ntickets
type myLock struct {
conn *sql.DB
connID int64
name string
ci db.Addr
n int
mu sync.Mutex
isLocked bool
}
//Create an instance of Lock
//ntickets - is concurrency allowed for the lock, meaning n processes can hold the lock
//at the same time
func Create(ci *db.Addr) Lock {
return &myLock{ci: *ci}
}
func (m *myLock) log() log.Logger {
return log.WithFields(log.Fields{"lock": m.name, "ticket": m.n})
}
func (m *myLock) lockName() string {
return fmt.Sprintf("%s.%s.%d", types.MySvcName, m.name, m.n)
}
// Lock waits for the duration specified for the lock to be available
// Also TryLock will reuse the connection if it already exists otherwise
// it will create a new one.
func (m *myLock) lock(s string, timeout time.Duration, ntickets int) bool {
m.mu.Lock()
defer m.mu.Unlock()
var err error
var res sql.NullBool
m.name = s
for m.n = 0; m.n < ntickets; m.n++ {
if !m.createConn() {
return false
}
err = m.conn.QueryRow("SELECT GET_LOCK(?,?)", m.lockName(), timeout/time.Second).Scan(&res)
//true - success, false - timeout, NULL - error
if err == nil && res.Valid && res.Bool {
m.isLocked = true
return true
}
if log.EL(m.log(), err) {
m.closeConn()
}
}
return false
}
// Lock waits for the duration specified for the lock to be available
// Also TryLock will reuse the connection if it already exists otherwise
// it will create a new one.
func (m *myLock) Lock(s string, timeout time.Duration) bool {
return m.lock(s, timeout, 1)
}
// TryLock tries to take a lock and returns an error if it cannot. If the lock
// is already held then TryLock is noop.
func (m *myLock) TryLock(s string) bool {
return m.Lock(s, 0)
}
// TryLockShared tries to take a lock and returns an error if it cannot. If the lock
// is already held then TryLock is noop.
func (m *myLock) TryLockShared(s string, n int) bool {
return m.lock(s, 0, n)
}
// IsLockedByMe checks whether the lock is held by the same connection as
// the current lock object.
func (m *myLock) IsLockedByMe() bool {
m.mu.Lock()
defer m.mu.Unlock()
var lockedBy int64
if m.conn == nil {
return false
}
err := m.conn.QueryRow("SELECT IFNULL(IS_USED_LOCK(?), 0)", m.lockName()).Scan(&lockedBy)
log.Debugf("lock: lockedBy: %v, myConnID: %v", lockedBy, m.connID)
if err != nil || m.connID == 0 || lockedBy != m.connID {
if err != nil {
m.log().Errorf("IsLockedByMe: error: " + err.Error())
m.closeConn()
} else {
m.log().Debugf("IsLockedByMe: lockedBy: %v, != myConnID: %v", lockedBy, m.connID)
}
return false
}
return true
}
// Refresh tries to keep the lock fresh.
func (m *myLock) Refresh() bool {
if !m.isLocked {
return true
}
if m.IsLockedByMe() {
return true
}
return m.TryLock(m.name)
}
// Unlock releases locks associated with the connection
func (m *myLock) Unlock() bool {
m.mu.Lock()
defer m.mu.Unlock()
if !m.isLocked {
return false
}
m.isLocked = false
if m.conn == nil {
return false
}
var res sql.NullBool
err := m.conn.QueryRow("SELECT RELEASE_LOCK(?)", m.lockName()).Scan(&res)
if log.EL(m.log(), err) {
return false
}
if !res.Valid {
m.log().Errorf("Lock did not exists on release")
return false
}
if !res.Bool {
m.log().Errorf("Lock was not hold by me")
return false
}
return true
}
func (m *myLock) Close() bool {
return m.closeConn()
}
func (m *myLock) createConn() bool {
if m.conn != nil && m.conn.Ping() == nil {
// Reset the connection ID as Ping may have opened a new connection
m.connID, _ = m.getConnID()
return true
}
var err error
m.conn, err = db.Open(&m.ci)
if log.EL(m.ci.Log(), err) {
return false
}
m.conn.SetConnMaxLifetime(-1)
m.conn.SetMaxIdleConns(1)
m.conn.SetMaxOpenConns(1)
m.connID, _ = m.getConnID()
return true
}
func (m *myLock) closeConn() bool {
if m.conn == nil {
return true
}
res := log.E(m.conn.Close())
m.conn = nil
m.connID = 0
m.isLocked = false
return !res
}
func (m *myLock) getConnID() (int64, error) {
if m.conn == nil {
return 0, errors.New("Connection not established yet")
}
var myConnID int64
err := m.conn.QueryRow("SELECT connection_id()").Scan(&myConnID)
return myConnID, err
}