public Connection getConnection()

in geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java [729:906]


  public Connection getConnection(InternalDistributedMember memberAddress,
      final boolean preserveOrder, long startTime, long ackTimeout,
      long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
    if (stopped) {
      throw new DistributedSystemDisconnectedException("The conduit is stopped");
    }

    InternalDistributedMember memberInTrouble = null;
    Connection conn = null;
    for (boolean breakLoop = false;;) {
      stopper.checkCancelInProgress(null);
      boolean interrupted = Thread.interrupted();
      try {
        // If this is the second time through this loop, we had problems.
        // Tear down the connection so that it gets rebuilt.
        if (conn != null) { // not first time in loop
          if (!membership.memberExists(memberAddress)
              || membership.isShunned(memberAddress)
              || membership.shutdownInProgress()) {
            throw new IOException("TCP/IP connection lost and member is not in view");
          }
          // Member is still in view; we MUST NOT give up!

          // Pause just a tiny bit...
          try {
            Thread.sleep(100);
          } catch (InterruptedException e) {
            interrupted = true;
            stopper.checkCancelInProgress(e);
          }

          // try again after sleep
          if (!membership.memberExists(memberAddress)
              || membership.isShunned(memberAddress)) {
            // OK, the member left. Just register an error.
            throw new IOException("TCP/IP connection lost and member is not in view");
          }

          // Print a warning (once)
          if (memberInTrouble == null) {
            memberInTrouble = memberAddress;
            logger.warn("Attempting TCP/IP reconnect to  {}", memberInTrouble);
          } else {
            if (logger.isDebugEnabled()) {
              logger.debug("Attempting TCP/IP reconnect to {}", memberInTrouble);
            }
          }

          // Close the connection (it will get rebuilt later).
          getStats().incReconnectAttempts();
          try {
            if (logger.isDebugEnabled()) {
              logger.debug("Closing old connection.  conn={} before retrying. memberInTrouble={}",
                  conn, memberInTrouble);
            }
            conn.closeForReconnect("closing before retrying");
          } catch (CancelException ex) {
            throw ex;
          } catch (Exception ignored) {
          }
        } // not first time in loop

        Exception problem = null;
        try {
          // Get (or regenerate) the connection
          // this could generate a ConnectionException, so it must be caught and retried
          boolean retryForOldConnection;
          boolean debugRetry = false;
          do {
            retryForOldConnection = false;
            conn = getConTable().get(memberAddress, preserveOrder, startTime, ackTimeout,
                ackSATimeout, false);
            if (conn == null) {
              // conduit may be closed - otherwise an ioexception would be thrown
              problem = new IOException(
                  String.format("Unable to reconnect to server; possible shutdown: %s",
                      memberAddress));
            } else if (conn.isClosing() || !conn.getRemoteAddress().equals(memberAddress)) {
              if (logger.isDebugEnabled()) {
                logger.debug("Got an old connection for {}: {}@{}", memberAddress, conn,
                    conn.hashCode());
              }
              conn.closeOldConnection("closing old connection");
              conn = null;
              retryForOldConnection = true;
              debugRetry = true;
            }
          } while (retryForOldConnection);
          if (debugRetry && logger.isDebugEnabled()) {
            logger.debug("Done removing old connections");
          }

          // we have a connection; fall through and return it
        } catch (ConnectionException e) {
          // Race condition between acquiring the connection and attempting
          // to use it: another thread closed it.
          problem = e;
          // No need to retry since Connection.createSender has already
          // done retries and now member is really unreachable for some reason
          // even though it may be in the view
          breakLoop = true;
        } catch (IOException e) {
          problem = e;
          // don't keep trying to connect to an alert listener
          if (AlertingAction.isThreadAlerting()) {
            if (logger.isDebugEnabled()) {
              logger.debug("Giving up connecting to alert listener {}", memberAddress);
            }
            breakLoop = true;
          }
        }

        if (problem != null) {
          // Some problems are not recoverable; check and error out early.
          if (!membership.memberExists(memberAddress)
              || membership.isShunned(memberAddress)) { // left the view
            // Bracket our original warning
            if (memberInTrouble != null) {
              // make this msg info to bracket warning
              logger.info("Ending reconnect attempt because {} has disappeared.", memberInTrouble);
            }
            throw new IOException(
                String.format("Peer has disappeared from view: %s", memberAddress));
          } // left the view

          if (membership.shutdownInProgress()) { // shutdown in progress
            // Bracket our original warning
            if (memberInTrouble != null) {
              // make this msg info to bracket warning
              logger.info("Ending reconnect attempt to {} because shutdown has started.",
                  memberInTrouble);
            }
            stopper.checkCancelInProgress(null);
            throw new DistributedSystemDisconnectedException(
                "Abandoned because shutdown is in progress");
          } // shutdown in progress

          // Log the warning. We wait until now, because we want
          // to have m defined for a nice message...
          if (memberInTrouble == null) {
            logger.warn("Error sending message to {} (will reattempt): {}", memberAddress, problem);
            memberInTrouble = memberAddress;
          } else {
            if (logger.isDebugEnabled()) {
              logger.debug("Error sending message to {}", memberAddress, problem);
            }
          }

          if (breakLoop) {
            if (problem instanceof IOException) {
              if (problem.getMessage().startsWith("Cannot form connection to alert listener")) {
                throw new AlertingIOException((IOException) problem);
              }
              throw (IOException) problem;
            }
            throw new IOException(
                String.format("Problem connecting to %s", memberAddress), problem);
          }
          // Retry the operation (indefinitely)
          continue;
        }
        // Success!

        // Make sure our logging is bracketed if there was a problem
        if (memberInTrouble != null) {
          logger.info("Successfully reconnected to member {}", memberInTrouble);
          if (logger.isTraceEnabled()) {
            logger.trace("new connection is {} memberAddress={}", conn, memberAddress);
          }
        }
        return conn;
      } finally {
        if (interrupted) {
          Thread.currentThread().interrupt();
        }
      }
    }
  }