private boolean acquire()

in twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLock.java [194:281]


  private boolean acquire(boolean interruptible,
                          final boolean waitForLock,
                          long timeout,
                          TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    Preconditions.checkState(lock.isHeldByCurrentThread(), "Not owner of local lock.");
    if (lock.getHoldCount() > 1) {
      // Already owner of the lock, simply return.
      return true;
    }

    // Use a Future to help deal with different variants of locking
    // (lock, lockInterruptibly, tryLock, tryLock with timeout)
    // When the completion future is completed successfully, it means the lock is acquired and the future contains
    // the ZK node path to the ephemeral node that is representing this lock.
    // If it is failed, it means there is exception while trying to acquire the lock
    // If it is cancelled, it means to abort the acquisition logic (due to timeout / interrupt).
    final SettableFuture<String> completion = SettableFuture.create();

    // If the connection expired, fail the locking process if it is still in progress
    final Cancellable watcherCancellable = zkClient.addConnectionWatcher(new Watcher() {
      @Override
      public void process(WatchedEvent event) {
        if (event.getState() == Event.KeeperState.Expired) {
          completion.setException(new IllegalStateException("ZK session expired"));
        }
      }
    });
    // Always remove the watcher on completion
    completion.addListener(new Runnable() {
      @Override
      public void run() {
        watcherCancellable.cancel();
      }
    }, Threads.SAME_THREAD_EXECUTOR);


    // Step 1. Create a ephemeral sequential node
    final String guid = UUID.randomUUID().toString();
    final String lockPath = String.format("%s/%s-", path, guid);
    OperationFuture<String> future = zkClient.create(lockPath, null, CreateMode.EPHEMERAL_SEQUENTIAL, true);

    Futures.addCallback(future, new FutureCallback<String>() {
      @Override
      public void onSuccess(final String lockNode) {
        // If lock failed due to whatever reason, delete the lock node.
        deleteNodeOnFailure(completion, lockNode);

        // If the lock is completed (mainly due to cancellation), simply abort the lock acquisition logic.
        if (completion.isDone()) {
          return;
        }

        // Step 2-5. Try to determine who is the lock owner and watch for ZK node changes if itself is not the owner.
        doAcquire(completion, waitForLock, guid, lockNode);
      }

      @Override
      public void onFailure(Throwable t) {
        if (t instanceof KeeperException.ConnectionLossException) {
          // Ignore connection exception in create. Going to handle it in next step.
          // See the ZK receipt for details about the possible failure situation that can cause this.
          doAcquire(completion, waitForLock, guid, null);
        } else {
          LOG.error("Exception raised when creating lock node at {}", lockPath, t);
          completion.setException(t);
        }
      }
    });

    // Gets the result from the completion
    try {
      if (interruptible) {
        localLockNode.set(completion.get(timeout, unit));
      } else {
        localLockNode.set(Uninterruptibles.getUninterruptibly(completion, timeout, unit));
      }
      return true;
    } catch (InterruptedException e) {
      completion.cancel(true);
      throw e;
    } catch (TimeoutException e) {
      completion.cancel(true);
      throw e;
    } catch (CancellationException e) {
      // If the completion get cancelled, meaning the lock acquisition is aborted.
      return false;
    }
  }