boolean recoverDFSFileLease()

in tephra-core/src/main/java/org/apache/tephra/persist/HDFSUtil.java [81:141]


  boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p,
                              final Configuration conf)
    throws IOException {
    LOG.info("Recovering lease on dfs file " + p);
    long startWaiting = System.currentTimeMillis();
    // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS
    // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves
    // beyond that limit 'to be safe'.
    long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting;
    // This setting should be what the cluster dfs heartbeat is set to.
    long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 3000);
    // This should be set to how long it'll take for us to timeout against primary datanode if it
    // is dead.  We set it to 61 seconds, 1 second than the default READ_TIMEOUT in HDFS, the
    // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY.
    long subsequentPause = conf.getInt("hbase.lease.recovery.dfs.timeout", 61 * 1000);

    Method isFileClosedMeth = null;
    // whether we need to look for isFileClosed method
    boolean findIsFileClosedMeth = true;
    boolean recovered = false;
    // We break the loop if we succeed the lease recovery, timeout, or we throw an exception.
    for (int nbAttempt = 0; !recovered; nbAttempt++) {
      recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
      if (recovered || checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) {
        break;
      }
      try {
        // On the first time through wait the short 'firstPause'.
        if (nbAttempt == 0) {
          Thread.sleep(firstPause);
        } else {
          // Cycle here until subsequentPause elapses.  While spinning, check isFileClosed if
          // available (should be in hadoop 2.0.5... not in hadoop 1 though.
          long localStartWaiting = System.currentTimeMillis();
          while ((System.currentTimeMillis() - localStartWaiting) <
            subsequentPause) {
            Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000));
            if (findIsFileClosedMeth) {
              try {
                isFileClosedMeth = dfs.getClass().getMethod("isFileClosed",
                                                            new Class[]{ Path.class });
              } catch (NoSuchMethodException nsme) {
                LOG.debug("isFileClosed not available");
              } finally {
                findIsFileClosedMeth = false;
              }
            }
            if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) {
              recovered = true;
              break;
            }
          }
        }
      } catch (InterruptedException ie) {
        InterruptedIOException iioe = new InterruptedIOException();
        iioe.initCause(ie);
        throw iioe;
      }
    }
    return recovered;
  }