private Set send()

in geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java [684:882]


  private Set<ID> send(Message<ID> msg, boolean reliably) {

    // perform the same jgroups messaging as in 8.2's GMSMembershipManager.send() method

    // BUT: when marshalling messages we need to include the version of the product and
    // localAddress at the beginning of the message. These should be used in the receiver
    // code to create a versioned input stream, read the sender address, then read the message
    // and set its sender address
    MembershipStatistics theStats = services.getStatistics();
    GMSMembershipView<ID> oldView = view;

    if (!myChannel.isConnected()) {
      logger.info("JGroupsMessenger channel is closed - messaging is not possible");
      throw new MembershipClosedException("Distributed System is shutting down");
    }

    filterOutgoingMessage(msg);

    List<ID> destinations = msg.getRecipients();
    boolean allDestinations = msg.forAll();

    boolean useMcast = false;
    if (services.getConfig().isMulticastEnabled()) {
      if (msg.getMulticast() || allDestinations) {
        useMcast = services.getManager().isMulticastAllowed();
      }
    }

    if (logger.isDebugEnabled() && reliably) {
      String recips = useMcast ? "multicast" : destinations.toString();
      if (logger.isDebugEnabled()) {
        logger.debug("sending via JGroups: [{}] recipients: {}", msg, recips);
      }
    }

    JGAddress local = jgAddress;

    Set<ID> failedRecipients = new HashSet<>();
    if (useMcast) {
      long startSer = theStats.startMsgSerialization();
      org.jgroups.Message jmsg;
      try {
        jmsg =
            createJGMessage(msg, local, null, KnownVersion.getCurrentVersion().ordinal());
      } catch (IOException e) {
        return new HashSet<>(msg.getRecipients());
      } finally {
        theStats.endMsgSerialization(startSer);
      }

      Exception problem;
      try {
        jmsg.setTransientFlag(TransientFlag.DONT_LOOPBACK);
        if (!reliably) {
          jmsg.setFlag(org.jgroups.Message.Flag.NO_RELIABILITY);
        }
        theStats.incSentBytes(jmsg.getLength());
        if (logger.isTraceEnabled()) {
          logger.trace("Sending JGroups message: {}", jmsg);
        }
        myChannel.send(jmsg);
      } catch (Exception e) {
        if (logger.isDebugEnabled()) {
          logger.debug("caught unexpected exception", e);
        }
        Throwable cause = e.getCause();
        if (cause instanceof MemberDisconnectedException) {
          problem = (Exception) cause;
        } else {
          problem = e;
        }
        if (services.getShutdownCause() != null) {
          Throwable shutdownCause = services.getShutdownCause();
          // If ForcedDisconnectException occurred then report it as actual
          // problem.
          if (shutdownCause instanceof MemberDisconnectedException) {
            problem = (Exception) shutdownCause;
          } else {
            Throwable ne = problem;
            while (ne.getCause() != null) {
              ne = ne.getCause();
            }
            ne.initCause(services.getShutdownCause());
          }
        }
        final String channelClosed =
            "Channel closed";
        throw new MembershipClosedException(channelClosed, problem);
      }
    } // useMcast
    else { // ! useMcast
      int len = destinations.size();
      List<ID> calculatedMembers; // explicit list of members
      int calculatedLen; // == calculatedMembers.len
      if (len == 1 && destinations.get(0) == ALL_RECIPIENTS) { // send to all
        // Grab a copy of the current membership
        GMSMembershipView<ID> v = services.getJoinLeave().getView();

        // Construct the list
        calculatedLen = v.size();
        calculatedMembers = new LinkedList<>();
        for (int i = 0; i < calculatedLen; i++) {
          ID m = v.get(i);
          calculatedMembers.add(m);
        }
      } // send to all
      else { // send to explicit list
        calculatedLen = len;
        calculatedMembers = new LinkedList<>();
        for (int i = 0; i < calculatedLen; i++) {
          calculatedMembers.add(destinations.get(i));
        }
      } // send to explicit list
      Int2ObjectOpenHashMap<org.jgroups.Message> messages = new Int2ObjectOpenHashMap<>();
      long startSer = theStats.startMsgSerialization();
      boolean firstMessage = true;
      for (ID mbr : calculatedMembers) {
        short version = mbr.getVersionOrdinal();
        if (!messages.containsKey(version)) {
          org.jgroups.Message jmsg;
          try {
            jmsg = createJGMessage(msg, local, mbr, version);
            messages.put(version, jmsg);
          } catch (IOException e) {
            failedRecipients.add(mbr);
            continue;
          }
          if (firstMessage) {
            theStats.incSentBytes(jmsg.getLength());
            firstMessage = false;
          }
        }
      }
      theStats.endMsgSerialization(startSer);
      Collections.shuffle(calculatedMembers);
      int i = 0;
      for (ID mbr : calculatedMembers) {
        JGAddress to = new JGAddress(mbr);
        short version = mbr.getVersionOrdinal();
        org.jgroups.Message jmsg = messages.get(version);
        if (jmsg == null) {
          continue; // failed for all recipients
        }
        Exception problem = null;
        try {
          org.jgroups.Message tmp = (i < (calculatedLen - 1)) ? jmsg.copy(true) : jmsg;
          if (!reliably) {
            jmsg.setFlag(org.jgroups.Message.Flag.NO_RELIABILITY);
          }
          tmp.setDest(to);
          tmp.setSrc(jgAddress);
          if (logger.isTraceEnabled()) {
            logger.trace("Unicasting to {}", to);
          }
          myChannel.send(tmp);
        } catch (Exception e) {
          problem = e;
        }
        if (problem != null) {
          Throwable cause = services.getShutdownCause();
          if (cause != null) {
            // If ForcedDisconnectException occurred then report it as actual
            // problem.
            if (cause instanceof MemberDisconnectedException) {
              problem = (Exception) cause;
            } else {
              Throwable ne = problem;
              while (ne.getCause() != null) {
                ne = ne.getCause();
              }
              ne.initCause(cause);
            }
          }
          final String channelClosed =
              "Channel closed";
          throw new MembershipClosedException(channelClosed, problem);
        }
      } // send individually
    } // !useMcast

    // The contract is that every destination enumerated in the
    // message should have received the message. If one left
    // (i.e., left the view), we signal it here.
    if (failedRecipients.isEmpty() && msg.forAll()) {
      return Collections.emptySet();
    }
    GMSMembershipView<ID> newView = view;
    if (newView != null && newView != oldView) {
      for (ID d : destinations) {
        if (!newView.contains(d)) {
          if (logger.isDebugEnabled()) {
            logger.debug("messenger: member has left the view: {}  view is now {}", d, newView);
          }
          failedRecipients.add(d);
        }
      }
    }
    return failedRecipients;
  }