public CompletableFuture asyncTryLock()

in stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java [539:636]


    public CompletableFuture<LockWaiter> asyncTryLock(final long timeout, final TimeUnit unit) {
        final CompletableFuture<String> result = new CompletableFuture<String>();
        final boolean wait = DistributedLogConstants.LOCK_IMMEDIATE != timeout;
        if (wait) {
            asyncTryLock(wait, result);
        } else {
            // try to check locks first
            zk.getChildren(lockPath, null, new AsyncCallback.Children2Callback() {
                @Override
                public void processResult(final int rc, String path, Object ctx,
                                          final List<String> children, Stat stat) {
                    lockStateExecutor.executeOrdered(lockPath, () -> {
                        if (!lockState.inState(State.INIT)) {
                            result.completeExceptionally(new LockStateChangedException(lockPath,
                                    lockId, State.INIT, lockState.getState()));
                            return;
                        }
                        if (KeeperException.Code.OK.intValue() != rc) {
                            result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                            return;
                        }

                        FailpointUtils.checkFailPointNoThrow(FailpointUtils.FailPointName.FP_LockTryAcquire);

                        Collections.sort(children, MEMBER_COMPARATOR);
                        if (children.size() > 0) {
                            asyncParseClientID(zk, lockPath, children.get(0)).whenCompleteAsync(
                                    new FutureEventListener<Pair<String, Long>>() {
                                        @Override
                                        public void onSuccess(Pair<String, Long> owner) {
                                            if (!checkOrClaimLockOwner(owner, result)) {
                                                acquireFuture.complete(false);
                                            }
                                        }

                                        @Override
                                        public void onFailure(final Throwable cause) {
                                            result.completeExceptionally(cause);
                                        }
                                    }, lockStateExecutor.chooseThread(lockPath));
                        } else {
                            asyncTryLock(wait, result);
                        }
                    });
                }
            }, null);
        }

        final CompletableFuture<Boolean> waiterAcquireFuture = FutureUtils.createFuture();
        waiterAcquireFuture.whenComplete((value, cause) -> acquireFuture.completeExceptionally(cause));
        return result.thenApply(new Function<String, LockWaiter>() {
            @Override
            public LockWaiter apply(final String currentOwner) {
                final Exception acquireException = new OwnershipAcquireFailedException(lockPath, currentOwner);
                FutureUtils.within(
                        acquireFuture,
                        timeout,
                        unit,
                        acquireException,
                        lockStateExecutor,
                        lockPath
                ).whenComplete(new FutureEventListener<Boolean>() {

                    @Override
                    public void onSuccess(Boolean acquired) {
                        completeOrFail(acquireException);
                    }

                    @Override
                    public void onFailure(final Throwable acquireCause) {
                        completeOrFail(acquireException);
                    }

                    private void completeOrFail(final Throwable acquireCause) {
                        if (isLockHeld()) {
                            waiterAcquireFuture.complete(true);
                        } else {
                            asyncUnlock().whenComplete(new FutureEventListener<Void>() {
                                @Override
                                public void onSuccess(Void value) {
                                    waiterAcquireFuture.completeExceptionally(acquireCause);
                                }

                                @Override
                                public void onFailure(Throwable cause) {
                                    waiterAcquireFuture.completeExceptionally(acquireCause);
                                }
                            });
                        }
                    }
                });
                return new LockWaiter(
                        lockId.getLeft(),
                        currentOwner,
                        waiterAcquireFuture);
            }
        });
    }