in stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java [1195:1293]
private void watchLockOwner(final LockWatcher lockWatcher,
final boolean wait,
final String myNode,
final String siblingNode,
final String ownerNode,
final Pair<String, Long> currentOwner,
final CompletableFuture<String> promise) {
executeLockAction(lockWatcher.epoch, new LockAction() {
@Override
public void execute() {
boolean shouldWatch;
final boolean shouldClaimOwnership;
if (lockContext.hasLockId(currentOwner) && siblingNode.equals(ownerNode)) {
// if the current owner is the znode left from previous session
// we should watch it and claim ownership
shouldWatch = true;
shouldClaimOwnership = true;
LOG.info("LockWatcher {} for {} found its previous session {} held lock,"
+ " watch it to claim ownership.", myNode, lockPath, currentOwner);
} else if (lockId.compareTo(currentOwner) == 0 && areLockWaitersInSameSession(siblingNode, ownerNode)) {
// I found that my sibling is the current owner with same lock id (client id & session id)
// It must be left by any race condition from same zookeeper client
shouldWatch = true;
shouldClaimOwnership = true;
LOG.info("LockWatcher {} for {} found itself {} already held lock at sibling node {},"
+ " watch it to claim ownership.",
myNode, lockPath, lockId, siblingNode);
} else {
shouldWatch = wait;
if (wait) {
if (LOG.isDebugEnabled()) {
LOG.debug("Current LockWatcher for {} with ephemeral node {}, "
+ "is waiting for {} to release lock at {}.",
lockPath, myNode, siblingNode, System.currentTimeMillis());
}
}
shouldClaimOwnership = false;
}
// watch sibling for lock ownership
if (shouldWatch) {
watchedNode = String.format("%s/%s", lockPath, siblingNode);
zk.exists(watchedNode, lockWatcher, new AsyncCallback.StatCallback() {
@Override
public void processResult(final int rc, String path, Object ctx, final Stat stat) {
executeLockAction(lockWatcher.epoch, new LockAction() {
@Override
public void execute() {
if (!lockState.inState(State.PREPARED)) {
promise.completeExceptionally(new LockStateChangedException(lockPath,
lockId, State.PREPARED, lockState.getState()));
return;
}
if (KeeperException.Code.OK.intValue() == rc) {
if (shouldClaimOwnership) {
// watch owner successfully
LOG.info("LockWatcher {} claimed ownership for {} after set watcher on {}.",
myNode, lockPath, ownerNode);
claimOwnership(lockWatcher.epoch);
promise.complete(currentOwner.getLeft());
} else {
// watch sibling successfully
lockState.transition(State.WAITING);
promise.complete(currentOwner.getLeft());
}
} else if (KeeperException.Code.NONODE.intValue() == rc) {
// sibling just disappeared, it might be the chance to claim ownership
checkLockOwnerAndWaitIfPossible(lockWatcher, wait, promise);
} else {
promise.completeExceptionally(
KeeperException.create(KeeperException.Code.get(rc)));
}
}
@Override
public String getActionName() {
StringBuilder sb = new StringBuilder();
sb.append("postWatchLockOwner(myNode=").append(myNode).append(", siblingNode=")
.append(siblingNode).append(", ownerNode=").append(ownerNode).append(")");
return sb.toString();
}
}, promise);
}
}, null);
} else {
promise.complete(currentOwner.getLeft());
}
}
@Override
public String getActionName() {
StringBuilder sb = new StringBuilder();
sb.append("watchLockOwner(myNode=").append(myNode).append(", siblingNode=")
.append(siblingNode).append(", ownerNode=").append(ownerNode).append(")");
return sb.toString();
}
}, promise);
}