in twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLock.java [194:281]
private boolean acquire(boolean interruptible,
final boolean waitForLock,
long timeout,
TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
Preconditions.checkState(lock.isHeldByCurrentThread(), "Not owner of local lock.");
if (lock.getHoldCount() > 1) {
// Already owner of the lock, simply return.
return true;
}
// Use a Future to help deal with different variants of locking
// (lock, lockInterruptibly, tryLock, tryLock with timeout)
// When the completion future is completed successfully, it means the lock is acquired and the future contains
// the ZK node path to the ephemeral node that is representing this lock.
// If it is failed, it means there is exception while trying to acquire the lock
// If it is cancelled, it means to abort the acquisition logic (due to timeout / interrupt).
final SettableFuture<String> completion = SettableFuture.create();
// If the connection expired, fail the locking process if it is still in progress
final Cancellable watcherCancellable = zkClient.addConnectionWatcher(new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.Expired) {
completion.setException(new IllegalStateException("ZK session expired"));
}
}
});
// Always remove the watcher on completion
completion.addListener(new Runnable() {
@Override
public void run() {
watcherCancellable.cancel();
}
}, Threads.SAME_THREAD_EXECUTOR);
// Step 1. Create a ephemeral sequential node
final String guid = UUID.randomUUID().toString();
final String lockPath = String.format("%s/%s-", path, guid);
OperationFuture<String> future = zkClient.create(lockPath, null, CreateMode.EPHEMERAL_SEQUENTIAL, true);
Futures.addCallback(future, new FutureCallback<String>() {
@Override
public void onSuccess(final String lockNode) {
// If lock failed due to whatever reason, delete the lock node.
deleteNodeOnFailure(completion, lockNode);
// If the lock is completed (mainly due to cancellation), simply abort the lock acquisition logic.
if (completion.isDone()) {
return;
}
// Step 2-5. Try to determine who is the lock owner and watch for ZK node changes if itself is not the owner.
doAcquire(completion, waitForLock, guid, lockNode);
}
@Override
public void onFailure(Throwable t) {
if (t instanceof KeeperException.ConnectionLossException) {
// Ignore connection exception in create. Going to handle it in next step.
// See the ZK receipt for details about the possible failure situation that can cause this.
doAcquire(completion, waitForLock, guid, null);
} else {
LOG.error("Exception raised when creating lock node at {}", lockPath, t);
completion.setException(t);
}
}
});
// Gets the result from the completion
try {
if (interruptible) {
localLockNode.set(completion.get(timeout, unit));
} else {
localLockNode.set(Uninterruptibles.getUninterruptibly(completion, timeout, unit));
}
return true;
} catch (InterruptedException e) {
completion.cancel(true);
throw e;
} catch (TimeoutException e) {
completion.cancel(true);
throw e;
} catch (CancellationException e) {
// If the completion get cancelled, meaning the lock acquisition is aborted.
return false;
}
}