in geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java [684:882]
private Set<ID> send(Message<ID> msg, boolean reliably) {
// perform the same jgroups messaging as in 8.2's GMSMembershipManager.send() method
// BUT: when marshalling messages we need to include the version of the product and
// localAddress at the beginning of the message. These should be used in the receiver
// code to create a versioned input stream, read the sender address, then read the message
// and set its sender address
MembershipStatistics theStats = services.getStatistics();
GMSMembershipView<ID> oldView = view;
if (!myChannel.isConnected()) {
logger.info("JGroupsMessenger channel is closed - messaging is not possible");
throw new MembershipClosedException("Distributed System is shutting down");
}
filterOutgoingMessage(msg);
List<ID> destinations = msg.getRecipients();
boolean allDestinations = msg.forAll();
boolean useMcast = false;
if (services.getConfig().isMulticastEnabled()) {
if (msg.getMulticast() || allDestinations) {
useMcast = services.getManager().isMulticastAllowed();
}
}
if (logger.isDebugEnabled() && reliably) {
String recips = useMcast ? "multicast" : destinations.toString();
if (logger.isDebugEnabled()) {
logger.debug("sending via JGroups: [{}] recipients: {}", msg, recips);
}
}
JGAddress local = jgAddress;
Set<ID> failedRecipients = new HashSet<>();
if (useMcast) {
long startSer = theStats.startMsgSerialization();
org.jgroups.Message jmsg;
try {
jmsg =
createJGMessage(msg, local, null, KnownVersion.getCurrentVersion().ordinal());
} catch (IOException e) {
return new HashSet<>(msg.getRecipients());
} finally {
theStats.endMsgSerialization(startSer);
}
Exception problem;
try {
jmsg.setTransientFlag(TransientFlag.DONT_LOOPBACK);
if (!reliably) {
jmsg.setFlag(org.jgroups.Message.Flag.NO_RELIABILITY);
}
theStats.incSentBytes(jmsg.getLength());
if (logger.isTraceEnabled()) {
logger.trace("Sending JGroups message: {}", jmsg);
}
myChannel.send(jmsg);
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("caught unexpected exception", e);
}
Throwable cause = e.getCause();
if (cause instanceof MemberDisconnectedException) {
problem = (Exception) cause;
} else {
problem = e;
}
if (services.getShutdownCause() != null) {
Throwable shutdownCause = services.getShutdownCause();
// If ForcedDisconnectException occurred then report it as actual
// problem.
if (shutdownCause instanceof MemberDisconnectedException) {
problem = (Exception) shutdownCause;
} else {
Throwable ne = problem;
while (ne.getCause() != null) {
ne = ne.getCause();
}
ne.initCause(services.getShutdownCause());
}
}
final String channelClosed =
"Channel closed";
throw new MembershipClosedException(channelClosed, problem);
}
} // useMcast
else { // ! useMcast
int len = destinations.size();
List<ID> calculatedMembers; // explicit list of members
int calculatedLen; // == calculatedMembers.len
if (len == 1 && destinations.get(0) == ALL_RECIPIENTS) { // send to all
// Grab a copy of the current membership
GMSMembershipView<ID> v = services.getJoinLeave().getView();
// Construct the list
calculatedLen = v.size();
calculatedMembers = new LinkedList<>();
for (int i = 0; i < calculatedLen; i++) {
ID m = v.get(i);
calculatedMembers.add(m);
}
} // send to all
else { // send to explicit list
calculatedLen = len;
calculatedMembers = new LinkedList<>();
for (int i = 0; i < calculatedLen; i++) {
calculatedMembers.add(destinations.get(i));
}
} // send to explicit list
Int2ObjectOpenHashMap<org.jgroups.Message> messages = new Int2ObjectOpenHashMap<>();
long startSer = theStats.startMsgSerialization();
boolean firstMessage = true;
for (ID mbr : calculatedMembers) {
short version = mbr.getVersionOrdinal();
if (!messages.containsKey(version)) {
org.jgroups.Message jmsg;
try {
jmsg = createJGMessage(msg, local, mbr, version);
messages.put(version, jmsg);
} catch (IOException e) {
failedRecipients.add(mbr);
continue;
}
if (firstMessage) {
theStats.incSentBytes(jmsg.getLength());
firstMessage = false;
}
}
}
theStats.endMsgSerialization(startSer);
Collections.shuffle(calculatedMembers);
int i = 0;
for (ID mbr : calculatedMembers) {
JGAddress to = new JGAddress(mbr);
short version = mbr.getVersionOrdinal();
org.jgroups.Message jmsg = messages.get(version);
if (jmsg == null) {
continue; // failed for all recipients
}
Exception problem = null;
try {
org.jgroups.Message tmp = (i < (calculatedLen - 1)) ? jmsg.copy(true) : jmsg;
if (!reliably) {
jmsg.setFlag(org.jgroups.Message.Flag.NO_RELIABILITY);
}
tmp.setDest(to);
tmp.setSrc(jgAddress);
if (logger.isTraceEnabled()) {
logger.trace("Unicasting to {}", to);
}
myChannel.send(tmp);
} catch (Exception e) {
problem = e;
}
if (problem != null) {
Throwable cause = services.getShutdownCause();
if (cause != null) {
// If ForcedDisconnectException occurred then report it as actual
// problem.
if (cause instanceof MemberDisconnectedException) {
problem = (Exception) cause;
} else {
Throwable ne = problem;
while (ne.getCause() != null) {
ne = ne.getCause();
}
ne.initCause(cause);
}
}
final String channelClosed =
"Channel closed";
throw new MembershipClosedException(channelClosed, problem);
}
} // send individually
} // !useMcast
// The contract is that every destination enumerated in the
// message should have received the message. If one left
// (i.e., left the view), we signal it here.
if (failedRecipients.isEmpty() && msg.forAll()) {
return Collections.emptySet();
}
GMSMembershipView<ID> newView = view;
if (newView != null && newView != oldView) {
for (ID d : destinations) {
if (!newView.contains(d)) {
if (logger.isDebugEnabled()) {
logger.debug("messenger: member has left the view: {} view is now {}", d, newView);
}
failedRecipients.add(d);
}
}
}
return failedRecipients;
}