in stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java [539:636]
public CompletableFuture<LockWaiter> asyncTryLock(final long timeout, final TimeUnit unit) {
final CompletableFuture<String> result = new CompletableFuture<String>();
final boolean wait = DistributedLogConstants.LOCK_IMMEDIATE != timeout;
if (wait) {
asyncTryLock(wait, result);
} else {
// try to check locks first
zk.getChildren(lockPath, null, new AsyncCallback.Children2Callback() {
@Override
public void processResult(final int rc, String path, Object ctx,
final List<String> children, Stat stat) {
lockStateExecutor.executeOrdered(lockPath, () -> {
if (!lockState.inState(State.INIT)) {
result.completeExceptionally(new LockStateChangedException(lockPath,
lockId, State.INIT, lockState.getState()));
return;
}
if (KeeperException.Code.OK.intValue() != rc) {
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
return;
}
FailpointUtils.checkFailPointNoThrow(FailpointUtils.FailPointName.FP_LockTryAcquire);
Collections.sort(children, MEMBER_COMPARATOR);
if (children.size() > 0) {
asyncParseClientID(zk, lockPath, children.get(0)).whenCompleteAsync(
new FutureEventListener<Pair<String, Long>>() {
@Override
public void onSuccess(Pair<String, Long> owner) {
if (!checkOrClaimLockOwner(owner, result)) {
acquireFuture.complete(false);
}
}
@Override
public void onFailure(final Throwable cause) {
result.completeExceptionally(cause);
}
}, lockStateExecutor.chooseThread(lockPath));
} else {
asyncTryLock(wait, result);
}
});
}
}, null);
}
final CompletableFuture<Boolean> waiterAcquireFuture = FutureUtils.createFuture();
waiterAcquireFuture.whenComplete((value, cause) -> acquireFuture.completeExceptionally(cause));
return result.thenApply(new Function<String, LockWaiter>() {
@Override
public LockWaiter apply(final String currentOwner) {
final Exception acquireException = new OwnershipAcquireFailedException(lockPath, currentOwner);
FutureUtils.within(
acquireFuture,
timeout,
unit,
acquireException,
lockStateExecutor,
lockPath
).whenComplete(new FutureEventListener<Boolean>() {
@Override
public void onSuccess(Boolean acquired) {
completeOrFail(acquireException);
}
@Override
public void onFailure(final Throwable acquireCause) {
completeOrFail(acquireException);
}
private void completeOrFail(final Throwable acquireCause) {
if (isLockHeld()) {
waiterAcquireFuture.complete(true);
} else {
asyncUnlock().whenComplete(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
waiterAcquireFuture.completeExceptionally(acquireCause);
}
@Override
public void onFailure(Throwable cause) {
waiterAcquireFuture.completeExceptionally(acquireCause);
}
});
}
}
});
return new LockWaiter(
lockId.getLeft(),
currentOwner,
waiterAcquireFuture);
}
});
}