static Connection createSender()

in geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java [967:1154]


  static Connection createSender(final Membership<InternalDistributedMember> mgr,
      final ConnectionTable t,
      final boolean preserveOrder, final InternalDistributedMember remoteAddr,
      final boolean sharedResource,
      final long startTime, final long ackTimeout, final long ackSATimeout, boolean doNotRetry)
      throws IOException, DistributedSystemDisconnectedException {
    boolean success = false;
    Connection conn = null;
    // keep trying. Note that this may be executing during the shutdown window
    // where a cancel criterion has not been established, but threads are being
    // interrupted. In this case we must allow the connection to succeed even
    // though subsequent messaging using the socket may fail
    boolean interrupted = Thread.interrupted();
    try {
      boolean connectionErrorLogged = false;
      long reconnectWaitTime = RECONNECT_WAIT_TIME;
      boolean suspected = false;
      boolean severeAlertIssued = false;
      boolean firstTime = true;
      boolean warningPrinted = false;
      while (!success) { // keep trying
        // Quit if DM has stopped distribution
        t.getConduit().getCancelCriterion().checkCancelInProgress(null);
        long now = System.currentTimeMillis();
        if (!severeAlertIssued && ackSATimeout > 0 && startTime + ackTimeout < now) {
          if (startTime + ackTimeout + ackSATimeout < now) {
            if (remoteAddr != null) {
              logger.fatal("Unable to form a TCP/IP connection to {} in over {} seconds",
                  remoteAddr, (ackSATimeout + ackTimeout) / 1000);
            }
            severeAlertIssued = true;
          } else if (!suspected) {
            if (remoteAddr != null) {
              logger.warn("Unable to form a TCP/IP connection to {} in over {} seconds",
                  remoteAddr, ackTimeout / 1000);
            }
            mgr.suspectMember(remoteAddr,
                "Unable to form a TCP/IP connection in a reasonable amount of time");
            suspected = true;
          }
          reconnectWaitTime =
              Math.min(RECONNECT_WAIT_TIME, ackSATimeout - (now - startTime - ackTimeout));
          if (reconnectWaitTime <= 0) {
            reconnectWaitTime = RECONNECT_WAIT_TIME;
          }
        } else if (!suspected && startTime > 0 && ackTimeout > 0
            && startTime + ackTimeout < now) {
          mgr.suspectMember(remoteAddr,
              "Unable to form a TCP/IP connection in a reasonable amount of time");
          suspected = true;
        }
        if (firstTime) {
          firstTime = false;
          if (!mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr)
              || mgr.shutdownInProgress()) {
            throw new IOException("Member " + remoteAddr + " left the system");
          }
        } else {
          // if we're sending an alert and can't connect, bail out. A sick
          // alert listener should not prevent cache operations from continuing
          if (AlertingAction.isThreadAlerting()) {
            // do not change the text of this exception - it is looked for in exception handlers
            throw new IOException("Cannot form connection to alert listener " + remoteAddr);
          }
          if (doNotRetry) {
            throw new IOException("Connection not created in first try to " + remoteAddr);
          }
          // Wait briefly...
          interrupted = Thread.interrupted() || interrupted;
          try {
            Thread.sleep(reconnectWaitTime);
          } catch (InterruptedException ie) {
            interrupted = true;
            t.getConduit().getCancelCriterion().checkCancelInProgress(ie);
          }
          t.getConduit().getCancelCriterion().checkCancelInProgress(null);
          if (giveUpOnMember(mgr, remoteAddr)) {
            throw new IOException(
                format("Member %s left the group", remoteAddr));
          }
          if (!warningPrinted) {
            warningPrinted = true;
            logger.warn("Connection: Attempting reconnect to peer {}",
                remoteAddr);
          }
          t.getConduit().getStats().incReconnectAttempts();
        }
        // create connection
        try {
          conn = new Connection(t, preserveOrder, remoteAddr, sharedResource);
        } catch (SSLHandshakeException se) {
          // no need to retry if certificates were rejected
          throw se;
        } catch (IOException ioe) {
          // Only give up if the member leaves the view.
          if (giveUpOnMember(mgr, remoteAddr)) {
            throw ioe;
          }
          t.getConduit().getCancelCriterion().checkCancelInProgress(null);
          if ("Too many open files".equals(ioe.getMessage())) {
            t.fileDescriptorsExhausted();
          } else if (!connectionErrorLogged) {
            connectionErrorLogged = true; // otherwise change to use 100ms intervals causes a lot of
                                          // these
            logger.info("Connection: shared={} ordered={} failed to connect to peer {} because: {}",
                sharedResource, preserveOrder, remoteAddr,
                ioe.getCause() != null ? ioe.getCause() : ioe);
          }
        } // IOException
        finally {
          if (conn == null) {
            t.getConduit().getStats().incFailedConnect();
          }
        }
        if (conn != null) {
          // handshake
          try {
            conn.attemptHandshake(t);
            if (conn.isSocketClosed()) {
              // something went wrong while reading the handshake
              // and the socket was closed or we were sent
              // ShutdownMessage
              if (giveUpOnMember(mgr, remoteAddr)) {
                throw new IOException(format("Member %s left the group", remoteAddr));
              }
              t.getConduit().getCancelCriterion().checkCancelInProgress(null);
              // no success but no need to log; just retry
            } else {
              success = true;
            }
          } catch (ConnectionException e) {
            if (giveUpOnMember(mgr, remoteAddr)) {
              throw new IOException("Handshake failed", e);
            }
            t.getConduit().getCancelCriterion().checkCancelInProgress(null);
            logger.info(
                "Connection: shared={} ordered={} handshake failed to connect to peer {} because: {}",
                sharedResource, preserveOrder, remoteAddr, e);
          } catch (IOException e) {
            if (giveUpOnMember(mgr, remoteAddr)) {
              throw e;
            }
            t.getConduit().getCancelCriterion().checkCancelInProgress(null);
            logger.info(
                "Connection: shared={} ordered={} handshake failed to connect to peer {} because: {}",
                sharedResource, preserveOrder, remoteAddr, e);
            if (!sharedResource && "Too many open files".equals(e.getMessage())) {
              t.fileDescriptorsExhausted();
            }
          } finally {
            if (!success) {
              try {
                conn.requestClose("failed handshake");
              } catch (Exception ignore) {
              }
              conn = null;
            }
          }
        }
      } // while
      if (warningPrinted) {
        logger.info("{}: Successfully reestablished connection to peer {}",
            mgr.getLocalMember(), remoteAddr);
      }
    } finally {
      try {
        if (!success) {
          if (conn != null) {
            conn.requestClose("failed construction");
            conn = null;
          }
        }
      } finally {
        if (interrupted) {
          Thread.currentThread().interrupt();
        }
      }
    }
    if (conn == null) {
      throw new ConnectionException(
          format("Connection: failed construction for peer %s", remoteAddr));
    }
    if (preserveOrder && BATCH_SENDS) {
      conn.createBatchSendBuffer();
    }
    conn.finishedConnecting = true;
    return conn;
  }