func()

in plugins/targetlocker/dblocker/dblocker.go [117:255]


func (d *DBLocker) handleLock(ctx xcontext.Context, jobID int64, targets []string, limit uint, timeout time.Duration, requireLocked bool, allowConflicts bool) ([]string, error) {
	if len(targets) == 0 {
		return nil, nil
	}

	// everything operates on this frozen time
	now := d.clock.Now()
	expiresAt := now.Add(timeout)

	tx, err := d.db.Begin()
	if err != nil {
		return nil, fmt.Errorf("unable to start database transaction: %w", err)
	}
	defer func() {
		// this always fails if tx.Commit() was called before, ignore error
		_ = tx.Rollback()
	}()

	locks, err := d.queryLocks(tx, targets)
	if err != nil {
		return nil, err
	}
	// go through existing locks, they are either held by something else and valid
	// (abort or skip, depending on allowConflicts setting),
	// not valid anymore (update), held by something else and not valid (expired),
	// held by us or not held at all (insert)
	var toInsert, missing []string
	var toDelete, conflicts []dblock
	for _, targetID := range targets {
		lock, ok := locks[targetID]
		switch {
		case !ok: // nonexistent lock
			if requireLocked {
				missing = append(missing, targetID)
			}
			toInsert = append(toInsert, targetID)
		case lock.jobID == jobID: // our lock, possibly expired
			toDelete = append(toDelete, lock)
			toInsert = append(toInsert, targetID)
		case lock.expiresAt.Before(now): // other job's expired lock
			if !requireLocked {
				toDelete = append(toDelete, lock)
				toInsert = append(toInsert, targetID)
			} else {
				conflicts = append(conflicts, lock)
			}
		default:
			conflicts = append(conflicts, lock)
		}
		if uint(len(toInsert)) >= limit {
			break
		}
	}
	if (len(conflicts) > 0 && !allowConflicts) || len(missing) > 0 {
		return nil, fmt.Errorf("unable to lock targets %v for owner %d, have %d conflicting locks (%v), %d missing locks (%v)",
			targets, jobID, len(conflicts), conflicts, len(missing), missing)
	}

	// First, drop all the locks that we intend to extend or take over.
	// Use strict matching so that if another instance races ahead of us, row will not be deleted and subsequent insert will fail.
	{
		var stmt []string
		var args []interface{}
		for i, lock := range toDelete {
			if len(stmt) == 0 {
				stmt = append(stmt, "DELETE FROM locks WHERE (target_id = ? AND job_id = ? AND expires_at = ?)")
			} else {
				stmt = append(stmt, " OR (target_id = ? AND job_id = ? AND expires_at = ?)")
			}
			args = append(args, lock.targetID, lock.jobID, lock.expiresAt)
			if len(stmt) < d.maxBatchSize && i < len(toDelete)-1 {
				continue
			}
			if _, err := tx.Exec(strings.Join(stmt, ""), args...); err != nil {
				return nil, fmt.Errorf("insert statement failed: %w", err)
			}
			stmt = nil
			args = nil
		}
	}

	// Now insert new entries for all the targets we are locking.
	{
		var stmt []string
		var args []interface{}
		for i, targetID := range toInsert {
			createdAt := now
			// If we are updating our own lock, carry over the creation timestamp.
			if lock, ok := locks[targetID]; ok && lock.jobID == jobID {
				createdAt = lock.createdAt
			}
			if len(stmt) == 0 {
				// this can race with other transactions, acceptable for TryLock
				if allowConflicts {
					stmt = append(stmt, "INSERT IGNORE INTO locks (target_id, job_id, created_at, expires_at, valid) VALUES (?, ?, ?, ?, ?)")
				} else {
					stmt = append(stmt, "INSERT INTO locks (target_id, job_id, created_at, expires_at, valid) VALUES (?, ?, ?, ?, ?)")
				}
			} else {
				stmt = append(stmt, ", (?, ?, ?, ?, ?)")
			}
			args = append(args, targetID, jobID, createdAt, expiresAt, true)
			if len(stmt) < d.maxBatchSize && i < len(toInsert)-1 {
				continue
			}
			if _, err := tx.Exec(strings.Join(stmt, ""), args...); err != nil {
				return nil, fmt.Errorf("insert statement failed: %w", err)
			}
			stmt = nil
			args = nil
		}
	}

	// Main transaction done
	txErr := tx.Commit()
	// Done except for TryLock
	if txErr != nil || !allowConflicts || len(toInsert) == 0 {
		return toInsert, txErr
	}

	// TryLock uses INSERT IGNORE, read inserted rows back to see which ones made it
	var actualInserts []string
	{
		actualLocks, err := d.queryLocks(d.db, toInsert)
		if err != nil {
			return nil, err
		}

		for _, targetID := range toInsert {
			lock, ok := actualLocks[targetID]
			// only care about locks that we own now
			if ok && lock.jobID == jobID {
				actualInserts = append(actualInserts, targetID)
			}
		}
	}

	return actualInserts, nil
}