private void watchLockOwner()

in stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java [1195:1293]


    private void watchLockOwner(final LockWatcher lockWatcher,
                                final boolean wait,
                                final String myNode,
                                final String siblingNode,
                                final String ownerNode,
                                final Pair<String, Long> currentOwner,
                                final CompletableFuture<String> promise) {
        executeLockAction(lockWatcher.epoch, new LockAction() {
            @Override
            public void execute() {
                boolean shouldWatch;
                final boolean shouldClaimOwnership;
                if (lockContext.hasLockId(currentOwner) && siblingNode.equals(ownerNode)) {
                    // if the current owner is the znode left from previous session
                    // we should watch it and claim ownership
                    shouldWatch = true;
                    shouldClaimOwnership = true;
                    LOG.info("LockWatcher {} for {} found its previous session {} held lock,"
                                    + " watch it to claim ownership.", myNode, lockPath, currentOwner);
                } else if (lockId.compareTo(currentOwner) == 0 && areLockWaitersInSameSession(siblingNode, ownerNode)) {
                    // I found that my sibling is the current owner with same lock id (client id & session id)
                    // It must be left by any race condition from same zookeeper client
                    shouldWatch = true;
                    shouldClaimOwnership = true;
                    LOG.info("LockWatcher {} for {} found itself {} already held lock at sibling node {},"
                                    + " watch it to claim ownership.",
                        myNode, lockPath, lockId, siblingNode);
                } else {
                    shouldWatch = wait;
                    if (wait) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Current LockWatcher for {} with ephemeral node {}, "
                                            + "is waiting for {} to release lock at {}.",
                                lockPath, myNode, siblingNode, System.currentTimeMillis());
                        }
                    }
                    shouldClaimOwnership = false;
                }

                // watch sibling for lock ownership
                if (shouldWatch) {
                    watchedNode = String.format("%s/%s", lockPath, siblingNode);
                    zk.exists(watchedNode, lockWatcher, new AsyncCallback.StatCallback() {
                        @Override
                        public void processResult(final int rc, String path, Object ctx, final Stat stat) {
                            executeLockAction(lockWatcher.epoch, new LockAction() {
                                @Override
                                public void execute() {
                                    if (!lockState.inState(State.PREPARED)) {
                                        promise.completeExceptionally(new LockStateChangedException(lockPath,
                                                lockId, State.PREPARED, lockState.getState()));
                                        return;
                                    }

                                    if (KeeperException.Code.OK.intValue() == rc) {
                                        if (shouldClaimOwnership) {
                                            // watch owner successfully
                                            LOG.info("LockWatcher {} claimed ownership for {} after set watcher on {}.",
                                                myNode, lockPath, ownerNode);
                                            claimOwnership(lockWatcher.epoch);
                                            promise.complete(currentOwner.getLeft());
                                        } else {
                                            // watch sibling successfully
                                            lockState.transition(State.WAITING);
                                            promise.complete(currentOwner.getLeft());
                                        }
                                    } else if (KeeperException.Code.NONODE.intValue() == rc) {
                                        // sibling just disappeared, it might be the chance to claim ownership
                                        checkLockOwnerAndWaitIfPossible(lockWatcher, wait, promise);
                                    } else {
                                        promise.completeExceptionally(
                                                KeeperException.create(KeeperException.Code.get(rc)));
                                    }
                                }

                                @Override
                                public String getActionName() {
                                    StringBuilder sb = new StringBuilder();
                                    sb.append("postWatchLockOwner(myNode=").append(myNode).append(", siblingNode=")
                                            .append(siblingNode).append(", ownerNode=").append(ownerNode).append(")");
                                    return sb.toString();
                                }
                            }, promise);
                        }
                    }, null);
                } else {
                    promise.complete(currentOwner.getLeft());
                }
            }

            @Override
            public String getActionName() {
                StringBuilder sb = new StringBuilder();
                sb.append("watchLockOwner(myNode=").append(myNode).append(", siblingNode=")
                        .append(siblingNode).append(", ownerNode=").append(ownerNode).append(")");
                return sb.toString();
            }
        }, promise);
    }