in plugins/targetlocker/dblocker/dblocker.go [117:255]
func (d *DBLocker) handleLock(ctx xcontext.Context, jobID int64, targets []string, limit uint, timeout time.Duration, requireLocked bool, allowConflicts bool) ([]string, error) {
if len(targets) == 0 {
return nil, nil
}
// everything operates on this frozen time
now := d.clock.Now()
expiresAt := now.Add(timeout)
tx, err := d.db.Begin()
if err != nil {
return nil, fmt.Errorf("unable to start database transaction: %w", err)
}
defer func() {
// this always fails if tx.Commit() was called before, ignore error
_ = tx.Rollback()
}()
locks, err := d.queryLocks(tx, targets)
if err != nil {
return nil, err
}
// go through existing locks, they are either held by something else and valid
// (abort or skip, depending on allowConflicts setting),
// not valid anymore (update), held by something else and not valid (expired),
// held by us or not held at all (insert)
var toInsert, missing []string
var toDelete, conflicts []dblock
for _, targetID := range targets {
lock, ok := locks[targetID]
switch {
case !ok: // nonexistent lock
if requireLocked {
missing = append(missing, targetID)
}
toInsert = append(toInsert, targetID)
case lock.jobID == jobID: // our lock, possibly expired
toDelete = append(toDelete, lock)
toInsert = append(toInsert, targetID)
case lock.expiresAt.Before(now): // other job's expired lock
if !requireLocked {
toDelete = append(toDelete, lock)
toInsert = append(toInsert, targetID)
} else {
conflicts = append(conflicts, lock)
}
default:
conflicts = append(conflicts, lock)
}
if uint(len(toInsert)) >= limit {
break
}
}
if (len(conflicts) > 0 && !allowConflicts) || len(missing) > 0 {
return nil, fmt.Errorf("unable to lock targets %v for owner %d, have %d conflicting locks (%v), %d missing locks (%v)",
targets, jobID, len(conflicts), conflicts, len(missing), missing)
}
// First, drop all the locks that we intend to extend or take over.
// Use strict matching so that if another instance races ahead of us, row will not be deleted and subsequent insert will fail.
{
var stmt []string
var args []interface{}
for i, lock := range toDelete {
if len(stmt) == 0 {
stmt = append(stmt, "DELETE FROM locks WHERE (target_id = ? AND job_id = ? AND expires_at = ?)")
} else {
stmt = append(stmt, " OR (target_id = ? AND job_id = ? AND expires_at = ?)")
}
args = append(args, lock.targetID, lock.jobID, lock.expiresAt)
if len(stmt) < d.maxBatchSize && i < len(toDelete)-1 {
continue
}
if _, err := tx.Exec(strings.Join(stmt, ""), args...); err != nil {
return nil, fmt.Errorf("insert statement failed: %w", err)
}
stmt = nil
args = nil
}
}
// Now insert new entries for all the targets we are locking.
{
var stmt []string
var args []interface{}
for i, targetID := range toInsert {
createdAt := now
// If we are updating our own lock, carry over the creation timestamp.
if lock, ok := locks[targetID]; ok && lock.jobID == jobID {
createdAt = lock.createdAt
}
if len(stmt) == 0 {
// this can race with other transactions, acceptable for TryLock
if allowConflicts {
stmt = append(stmt, "INSERT IGNORE INTO locks (target_id, job_id, created_at, expires_at, valid) VALUES (?, ?, ?, ?, ?)")
} else {
stmt = append(stmt, "INSERT INTO locks (target_id, job_id, created_at, expires_at, valid) VALUES (?, ?, ?, ?, ?)")
}
} else {
stmt = append(stmt, ", (?, ?, ?, ?, ?)")
}
args = append(args, targetID, jobID, createdAt, expiresAt, true)
if len(stmt) < d.maxBatchSize && i < len(toInsert)-1 {
continue
}
if _, err := tx.Exec(strings.Join(stmt, ""), args...); err != nil {
return nil, fmt.Errorf("insert statement failed: %w", err)
}
stmt = nil
args = nil
}
}
// Main transaction done
txErr := tx.Commit()
// Done except for TryLock
if txErr != nil || !allowConflicts || len(toInsert) == 0 {
return toInsert, txErr
}
// TryLock uses INSERT IGNORE, read inserted rows back to see which ones made it
var actualInserts []string
{
actualLocks, err := d.queryLocks(d.db, toInsert)
if err != nil {
return nil, err
}
for _, targetID := range toInsert {
lock, ok := actualLocks[targetID]
// only care about locks that we own now
if ok && lock.jobID == jobID {
actualInserts = append(actualInserts, targetID)
}
}
}
return actualInserts, nil
}