public Packet sendBlocking()

in artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java [475:590]


   public Packet sendBlocking(final Packet packet,
                              final int reconnectID,
                              final byte expectedPacket,
                              final long timeout,
                              final boolean failOnTimeout) throws ActiveMQException {
      String interceptionResult = invokeInterceptors(packet, interceptors, connection);

      if (interceptionResult != null) {
         if (logger.isTraceEnabled()) {
            logger.trace("RemotingConnectionID={} interceptionResult={}", connection.getID(), interceptionResult);
         }
         // if we don't throw an exception here the client might not unblock
         throw ActiveMQClientMessageBundle.BUNDLE.interceptorRejectedPacket(interceptionResult);
      }

      if (closed) {
         if (logger.isTraceEnabled()) {
            logger.trace("RemotingConnectionID={} closed.", connection.getID());
         }
         throw ActiveMQClientMessageBundle.BUNDLE.connectionDestroyed();
      }

      if (timeout == -1) {
         if (logger.isTraceEnabled()) {
            logger.trace("RemotingConnectionID={} Cannot do a blocking call timeout on a server side connection", connection.getID());
         }
         throw new IllegalStateException("Cannot do a blocking call timeout on a server side connection");
      }

      // Synchronized since can't be called concurrently by more than one thread and this can occur
      // E.g. blocking acknowledge() from inside a message handler at some time as other operation on main thread
      synchronized (sendBlockingLock) {
         packet.setChannelID(id);

         packet.setCorrelationID(blockingCorrelationID.decrementAndGet());

         final ActiveMQBuffer buffer = packet.encode(connection);

         lock.lock();

         try {
            if (failingOver) {
               waitForFailOver("RemotingConnectionID=" + connection.getID() + " timed-out waiting for fail-over condition on blocking send");
            }

            response = null;

            if (resendCache != null && packet.isRequiresConfirmations()) {
               addResendPacket(packet);
            }

            checkReconnectID(reconnectID);

            if (logger.isTraceEnabled()) {
               logger.trace("RemotingConnectionID={} Sending blocking {}", connection.getID(), packet);
            }

            connection.getTransportConnection().write(buffer, false, false);

            long toWait = timeout;

            long start = System.currentTimeMillis();

            while (!closed && (response == null || (response.getType() != EXCEPTION && (response.getType() != expectedPacket || response.getCorrelationID() != packet.getCorrelationID()))) && toWait > 0) {
               try {
                  sendCondition.await(toWait, TimeUnit.MILLISECONDS);
               } catch (InterruptedException e) {
                  throw new ActiveMQInterruptedException(e);
               }

               if (response != null && response.getType() != EXCEPTION && response.getType() != expectedPacket && !response.isResponseAsync()) {
                  ActiveMQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace"));
               }

               if (closed) {
                  break;
               }

               final long now = System.currentTimeMillis();

               toWait -= now - start;

               start = now;
            }

            if (closed && toWait > 0 && response == null) {
               Throwable cause = ActiveMQClientMessageBundle.BUNDLE.connectionDestroyed();
               throw ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause);
            }

            if (!failOnTimeout && response == null) {
               return null;
            }

            if (response == null || (response.getType() != EXCEPTION && response.getCorrelationID() != packet.getCorrelationID())) {
               ActiveMQException e = ActiveMQClientMessageBundle.BUNDLE.timedOutSendingPacket(timeout, packet.getType());
               connection.asyncFail(e);
               throw e;
            }

            if (response.getType() == EXCEPTION) {
               final ActiveMQExceptionMessage mem = (ActiveMQExceptionMessage) response;

               ActiveMQException e = mem.getException();

               e.fillInStackTrace();

               throw e;
            }
         } finally {
            lock.unlock();
         }

         return response;
      }
   }