private int sendToMany()

in geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java [221:412]


  private int sendToMany(final Membership mgr,
      InternalDistributedMember[] p_destinations,
      final DistributionMessage msg, long ackWaitThreshold, long ackSAThreshold)
      throws ConnectExceptions, NotSerializableException {
    InternalDistributedMember[] destinations = p_destinations;

    // Collects connect exceptions that happened during previous attempts to send.
    // These represent members we are not able to distribute to.
    ConnectExceptions failedCe = null;
    // Describes the destinations that we need to retry the send to.
    ConnectExceptions retryInfo = null;
    int bytesWritten = 0;
    boolean retry = false;
    final boolean orderedMsg = msg.orderedDelivery() || Connection.isDominoThread();
    // Connections we actually sent messages to.
    final List totalSentCons = new ArrayList(destinations.length);
    boolean interrupted = false;

    long ackTimeout = 0;
    long ackSDTimeout = 0;
    long startTime = 0;
    final DirectReplyMessage directMsg;
    if (msg instanceof DirectReplyMessage) {
      directMsg = (DirectReplyMessage) msg;
    } else {
      directMsg = null;
    }
    if (directMsg != null || msg.getProcessorId() > 0) {
      ackTimeout = (int) (ackWaitThreshold * 1000);
      if (msg.isSevereAlertCompatible() || ReplyProcessor21.isSevereAlertProcessingForced()) {
        ackSDTimeout = (int) (ackSAThreshold * 1000);
        if (ReplyProcessor21.getShortSevereAlertProcessing()) {
          ackSDTimeout = (int) (ReplyProcessor21.PR_SEVERE_ALERT_RATIO * ackSDTimeout);
        }
      }
    }

    boolean directReply =
        directMsg != null && directMsg.supportsDirectAck() && threadOwnsResources();

    // If this is a direct reply message, but we are sending it
    // over the shared socket, tell the message it needs to
    // use a regular reply processor.
    if (!directReply && directMsg != null) {
      directMsg.registerProcessor();
    }
    if (logger.isDebugEnabled()) {
      logger.debug("Sending ({}) to {} peers ({}) via tcp/ip",
          msg, p_destinations.length, Arrays.toString(p_destinations));
    }

    try {
      do {
        interrupted = Thread.interrupted() || interrupted;
        /*
         * Exceptions that happened during one attempt to send
         */
        if (retryInfo != null) {
          // need to retry to each of the members in the exception
          List retryMembers = retryInfo.getMembers();
          InternalDistributedMember[] retryDest =
              new InternalDistributedMember[retryMembers.size()];
          retryDest = (InternalDistributedMember[]) retryMembers.toArray(retryDest);
          destinations = retryDest;
          retryInfo = null;
          retry = true;
        }
        final List<Connection> cons = new ArrayList<>(destinations.length);
        ConnectExceptions ce = getConnections(mgr, msg, destinations, orderedMsg, retry, ackTimeout,
            ackSDTimeout, cons);

        if (directReply && msg.getProcessorId() > 0) { // no longer a direct-reply message?
          directReply = false;
        }
        if (ce != null) {

          if (!retry) {
            retryInfo = ce;
          } else {

            if (failedCe != null) {
              failedCe.getMembers().addAll(ce.getMembers());
              failedCe.getCauses().addAll(ce.getCauses());
            } else {
              failedCe = ce;
            }
          }
          ce = null;
        }
        if (cons.isEmpty()) {
          if (failedCe != null) {
            throw failedCe;
          }
          if (retryInfo != null) {
            continue;
          }
          return bytesWritten;
        }

        if (logger.isDebugEnabled()) {
          logger.debug("{} on these {} connections: {}",
              (retry ? "Retrying send" : "Sending"), cons.size(), cons);
        }
        DMStats stats = getDMStats();
        List<?> sentCons; // used for cons we sent to this time

        final BaseMsgStreamer ms =
            MsgStreamer.create(cons, msg, directReply, stats, bufferPool);
        try {
          startTime = 0;
          if (ackTimeout > 0) {
            startTime = System.currentTimeMillis();
          }
          ms.reserveConnections(startTime, ackTimeout, ackSDTimeout);

          int result = ms.writeMessage();
          if (bytesWritten == 0) {
            // bytesWritten only needs to be set once.
            // if we have to do a retry we don't want to count
            // each one's bytes.
            bytesWritten = result;
          }
          ce = ms.getConnectExceptions();
          sentCons = ms.getSentConnections();

          totalSentCons.addAll(sentCons);
        } catch (NotSerializableException e) {
          throw e;
        } catch (IOException ex) {
          throw new InternalGemFireException(
              "Unknown error serializing message",
              ex);
        } finally {
          try {
            ms.close();
          } catch (IOException e) {
            throw new InternalGemFireException("Unknown error serializing message", e);
          }
        }

        if (ce != null) {
          if (retryInfo != null) {
            retryInfo.getMembers().addAll(ce.getMembers());
            retryInfo.getCauses().addAll(ce.getCauses());
          } else {
            retryInfo = ce;
          }
          ce = null;
        }

        if (directReply && !sentCons.isEmpty()) {
          long readAckStart = 0;
          if (stats != null) {
            readAckStart = stats.startReplyWait();
          }
          try {
            ce = readAcks(sentCons, startTime, ackTimeout, ackSDTimeout, ce,
                directMsg.getDirectReplyProcessor());
          } finally {
            if (stats != null) {
              stats.endReplyWait(readAckStart, startTime);
            }
          }
        }

        if (ce != null) {
          if (retryInfo != null) {
            retryInfo.getMembers().addAll(ce.getMembers());
            retryInfo.getCauses().addAll(ce.getCauses());
          } else {
            retryInfo = ce;
          }
          ce = null;
        }
        if (retryInfo != null) {
          conduit.getCancelCriterion().checkCancelInProgress(null);
        }
      } while (retryInfo != null);
    } finally {
      if (interrupted) {
        Thread.currentThread().interrupt();
      }
      for (final Object totalSentCon : totalSentCons) {
        Connection con = (Connection) totalSentCon;
        con.setInUse(false, 0, 0, 0, null);
      }
    }
    if (failedCe != null) {
      throw failedCe;
    }
    return bytesWritten;
  }