in geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java [221:412]
private int sendToMany(final Membership mgr,
InternalDistributedMember[] p_destinations,
final DistributionMessage msg, long ackWaitThreshold, long ackSAThreshold)
throws ConnectExceptions, NotSerializableException {
InternalDistributedMember[] destinations = p_destinations;
// Collects connect exceptions that happened during previous attempts to send.
// These represent members we are not able to distribute to.
ConnectExceptions failedCe = null;
// Describes the destinations that we need to retry the send to.
ConnectExceptions retryInfo = null;
int bytesWritten = 0;
boolean retry = false;
final boolean orderedMsg = msg.orderedDelivery() || Connection.isDominoThread();
// Connections we actually sent messages to.
final List totalSentCons = new ArrayList(destinations.length);
boolean interrupted = false;
long ackTimeout = 0;
long ackSDTimeout = 0;
long startTime = 0;
final DirectReplyMessage directMsg;
if (msg instanceof DirectReplyMessage) {
directMsg = (DirectReplyMessage) msg;
} else {
directMsg = null;
}
if (directMsg != null || msg.getProcessorId() > 0) {
ackTimeout = (int) (ackWaitThreshold * 1000);
if (msg.isSevereAlertCompatible() || ReplyProcessor21.isSevereAlertProcessingForced()) {
ackSDTimeout = (int) (ackSAThreshold * 1000);
if (ReplyProcessor21.getShortSevereAlertProcessing()) {
ackSDTimeout = (int) (ReplyProcessor21.PR_SEVERE_ALERT_RATIO * ackSDTimeout);
}
}
}
boolean directReply =
directMsg != null && directMsg.supportsDirectAck() && threadOwnsResources();
// If this is a direct reply message, but we are sending it
// over the shared socket, tell the message it needs to
// use a regular reply processor.
if (!directReply && directMsg != null) {
directMsg.registerProcessor();
}
if (logger.isDebugEnabled()) {
logger.debug("Sending ({}) to {} peers ({}) via tcp/ip",
msg, p_destinations.length, Arrays.toString(p_destinations));
}
try {
do {
interrupted = Thread.interrupted() || interrupted;
/*
* Exceptions that happened during one attempt to send
*/
if (retryInfo != null) {
// need to retry to each of the members in the exception
List retryMembers = retryInfo.getMembers();
InternalDistributedMember[] retryDest =
new InternalDistributedMember[retryMembers.size()];
retryDest = (InternalDistributedMember[]) retryMembers.toArray(retryDest);
destinations = retryDest;
retryInfo = null;
retry = true;
}
final List<Connection> cons = new ArrayList<>(destinations.length);
ConnectExceptions ce = getConnections(mgr, msg, destinations, orderedMsg, retry, ackTimeout,
ackSDTimeout, cons);
if (directReply && msg.getProcessorId() > 0) { // no longer a direct-reply message?
directReply = false;
}
if (ce != null) {
if (!retry) {
retryInfo = ce;
} else {
if (failedCe != null) {
failedCe.getMembers().addAll(ce.getMembers());
failedCe.getCauses().addAll(ce.getCauses());
} else {
failedCe = ce;
}
}
ce = null;
}
if (cons.isEmpty()) {
if (failedCe != null) {
throw failedCe;
}
if (retryInfo != null) {
continue;
}
return bytesWritten;
}
if (logger.isDebugEnabled()) {
logger.debug("{} on these {} connections: {}",
(retry ? "Retrying send" : "Sending"), cons.size(), cons);
}
DMStats stats = getDMStats();
List<?> sentCons; // used for cons we sent to this time
final BaseMsgStreamer ms =
MsgStreamer.create(cons, msg, directReply, stats, bufferPool);
try {
startTime = 0;
if (ackTimeout > 0) {
startTime = System.currentTimeMillis();
}
ms.reserveConnections(startTime, ackTimeout, ackSDTimeout);
int result = ms.writeMessage();
if (bytesWritten == 0) {
// bytesWritten only needs to be set once.
// if we have to do a retry we don't want to count
// each one's bytes.
bytesWritten = result;
}
ce = ms.getConnectExceptions();
sentCons = ms.getSentConnections();
totalSentCons.addAll(sentCons);
} catch (NotSerializableException e) {
throw e;
} catch (IOException ex) {
throw new InternalGemFireException(
"Unknown error serializing message",
ex);
} finally {
try {
ms.close();
} catch (IOException e) {
throw new InternalGemFireException("Unknown error serializing message", e);
}
}
if (ce != null) {
if (retryInfo != null) {
retryInfo.getMembers().addAll(ce.getMembers());
retryInfo.getCauses().addAll(ce.getCauses());
} else {
retryInfo = ce;
}
ce = null;
}
if (directReply && !sentCons.isEmpty()) {
long readAckStart = 0;
if (stats != null) {
readAckStart = stats.startReplyWait();
}
try {
ce = readAcks(sentCons, startTime, ackTimeout, ackSDTimeout, ce,
directMsg.getDirectReplyProcessor());
} finally {
if (stats != null) {
stats.endReplyWait(readAckStart, startTime);
}
}
}
if (ce != null) {
if (retryInfo != null) {
retryInfo.getMembers().addAll(ce.getMembers());
retryInfo.getCauses().addAll(ce.getCauses());
} else {
retryInfo = ce;
}
ce = null;
}
if (retryInfo != null) {
conduit.getCancelCriterion().checkCancelInProgress(null);
}
} while (retryInfo != null);
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
for (final Object totalSentCon : totalSentCons) {
Connection con = (Connection) totalSentCon;
con.setInUse(false, 0, 0, 0, null);
}
}
if (failedCe != null) {
throw failedCe;
}
return bytesWritten;
}