in artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java [475:590]
public Packet sendBlocking(final Packet packet,
final int reconnectID,
final byte expectedPacket,
final long timeout,
final boolean failOnTimeout) throws ActiveMQException {
String interceptionResult = invokeInterceptors(packet, interceptors, connection);
if (interceptionResult != null) {
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID={} interceptionResult={}", connection.getID(), interceptionResult);
}
// if we don't throw an exception here the client might not unblock
throw ActiveMQClientMessageBundle.BUNDLE.interceptorRejectedPacket(interceptionResult);
}
if (closed) {
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID={} closed.", connection.getID());
}
throw ActiveMQClientMessageBundle.BUNDLE.connectionDestroyed();
}
if (timeout == -1) {
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID={} Cannot do a blocking call timeout on a server side connection", connection.getID());
}
throw new IllegalStateException("Cannot do a blocking call timeout on a server side connection");
}
// Synchronized since can't be called concurrently by more than one thread and this can occur
// E.g. blocking acknowledge() from inside a message handler at some time as other operation on main thread
synchronized (sendBlockingLock) {
packet.setChannelID(id);
packet.setCorrelationID(blockingCorrelationID.decrementAndGet());
final ActiveMQBuffer buffer = packet.encode(connection);
lock.lock();
try {
if (failingOver) {
waitForFailOver("RemotingConnectionID=" + connection.getID() + " timed-out waiting for fail-over condition on blocking send");
}
response = null;
if (resendCache != null && packet.isRequiresConfirmations()) {
addResendPacket(packet);
}
checkReconnectID(reconnectID);
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID={} Sending blocking {}", connection.getID(), packet);
}
connection.getTransportConnection().write(buffer, false, false);
long toWait = timeout;
long start = System.currentTimeMillis();
while (!closed && (response == null || (response.getType() != EXCEPTION && (response.getType() != expectedPacket || response.getCorrelationID() != packet.getCorrelationID()))) && toWait > 0) {
try {
sendCondition.await(toWait, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
if (response != null && response.getType() != EXCEPTION && response.getType() != expectedPacket && !response.isResponseAsync()) {
ActiveMQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace"));
}
if (closed) {
break;
}
final long now = System.currentTimeMillis();
toWait -= now - start;
start = now;
}
if (closed && toWait > 0 && response == null) {
Throwable cause = ActiveMQClientMessageBundle.BUNDLE.connectionDestroyed();
throw ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause);
}
if (!failOnTimeout && response == null) {
return null;
}
if (response == null || (response.getType() != EXCEPTION && response.getCorrelationID() != packet.getCorrelationID())) {
ActiveMQException e = ActiveMQClientMessageBundle.BUNDLE.timedOutSendingPacket(timeout, packet.getType());
connection.asyncFail(e);
throw e;
}
if (response.getType() == EXCEPTION) {
final ActiveMQExceptionMessage mem = (ActiveMQExceptionMessage) response;
ActiveMQException e = mem.getException();
e.fillInStackTrace();
throw e;
}
} finally {
lock.unlock();
}
return response;
}
}