in twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLock.java [286:368]
private void doAcquire(final SettableFuture<String> completion, final boolean waitForLock,
final String guid, @Nullable final String lockPath) {
// Step 2. Get all children under the lock parent.
Futures.addCallback(zkClient.getChildren(path), new FutureCallback<NodeChildren>() {
@Override
public void onSuccess(NodeChildren children) {
// Find the lock node in case the creation step failed by matching the guid
// See "Recoverable Errors and the GUID" in the ZooKeeper guide
final String lockNode = lockPath == null ? findLockNode(children.getChildren(), guid) : lockPath;
if (lockNode == null) {
// If not able to find the lock node, fail the locking procedure.
completion.setException(new IllegalStateException("Failed to acquire lock").fillInStackTrace());
return;
}
if (lockPath == null) {
// If lock node was not determined in step 1 due to connection loss exception, need to add the
// node deletion handler in here after the actual lockNode is determined.
deleteNodeOnFailure(completion, lockNode);
}
// Find the node to watch, which is the one with the largest id that is smaller than currentId
// If the current id is the smallest one, nodeToWatch will be null
final String nodeToWatch = findNodeToWatch(children, lockNode, guid);
// Step 3a. lockNode is the lowest, hence this become lock owner.
if (nodeToWatch == null) {
// Acquired lock
completion.set(lockNode);
} else if (!waitForLock) {
// This is for the case of tryLock() without timeout.
completion.cancel(true);
}
// If the lock acquisition is completed, due to whatever reason, we don't need to watch for any other nodes
if (completion.isDone()) {
return;
}
// Step 3b and 4. See if the the next lowest sequence ID exists. If it does, leave a watch
// Use getData() instead of exists() to avoid leaking Watcher resources (if the node is gone, there will
// be a watch left on the ZK server if exists() is used).
OperationFuture<NodeData> getDataFuture = zkClient.getData(nodeToWatch, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (!completion.isDone()) {
// If the watching node changed, go to step 2.
doAcquire(completion, waitForLock, guid, lockNode);
}
}
});
// Step 5. Depends on the exists call result, either go to step 2 if the nodeToWatch is gone or just
// let the watcher to trigger step 2 when there is change to the nodeToWatch.
Futures.addCallback(getDataFuture, new FutureCallback<NodeData>() {
@Override
public void onSuccess(NodeData nodeData) {
// No-op
}
@Override
public void onFailure(Throwable t) {
// See if the failure is due to node not exists. If that's the case, go to step 2.
if (t instanceof KeeperException.NoNodeException && !completion.isDone()) {
doAcquire(completion, waitForLock, guid, lockNode);
} else {
// If failed due to something else, fail the lock acquisition.
completion.setException(t);
}
}
});
}
@Override
public void onFailure(Throwable t) {
if (lockPath != null) {
completion.setException(t);
} else {
doAcquire(completion, waitForLock, guid, null);
}
}
});
}