public Packet sendBlocking()

in artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java [461:570]


   public Packet sendBlocking(final Packet packet,
                              final int reconnectID,
                              byte expectedPacket) throws ActiveMQException {
      String interceptionResult = invokeInterceptors(packet, interceptors, connection);

      if (interceptionResult != null) {
         if (logger.isTraceEnabled()) {
            logger.trace("RemotingConnectionID={} interceptionResult={}", (connection == null ? "NULL" : connection.getID()), interceptionResult);
         }
         // if we don't throw an exception here the client might not unblock
         throw ActiveMQClientMessageBundle.BUNDLE.interceptorRejectedPacket(interceptionResult);
      }

      if (closed) {
         if (logger.isTraceEnabled()) {
            logger.trace("RemotingConnectionID={} closed.", (connection == null ? "NULL" : connection.getID()));
         }
         throw ActiveMQClientMessageBundle.BUNDLE.connectionDestroyed();
      }

      if (connection.getBlockingCallTimeout() == -1) {
         if (logger.isTraceEnabled()) {
            logger.trace("RemotingConnectionID={} Cannot do a blocking call timeout on a server side connection", (connection == null ? "NULL" : connection.getID()));
         }
         throw new IllegalStateException("Cannot do a blocking call timeout on a server side connection");
      }

      // Synchronized since can't be called concurrently by more than one thread and this can occur
      // E.g. blocking acknowledge() from inside a message handler at some time as other operation on main thread
      synchronized (sendBlockingLock) {
         packet.setChannelID(id);

         packet.setCorrelationID(blockingCorrelationID.decrementAndGet());

         final ActiveMQBuffer buffer = packet.encode(connection);

         lock.lock();

         try {
            if (failingOver) {
               waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on blocking send");
            }

            response = null;

            if (resendCache != null && packet.isRequiresConfirmations()) {
               addResendPacket(packet);
            }

            checkReconnectID(reconnectID);

            if (logger.isTraceEnabled()) {
               logger.trace("RemotingConnectionID={} Sending blocking {}", (connection == null ? "NULL" : connection.getID()), packet);
            }

            connection.getTransportConnection().write(buffer, false, false);

            long toWait = connection.getBlockingCallTimeout();

            long start = System.currentTimeMillis();

            while (!closed && (response == null || (response.getType() != PacketImpl.EXCEPTION && (response.getType() != expectedPacket || response.getCorrelationID() != packet.getCorrelationID()))) && toWait > 0) {
               try {
                  sendCondition.await(toWait, TimeUnit.MILLISECONDS);
               } catch (InterruptedException e) {
                  throw new ActiveMQInterruptedException(e);
               }

               if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket && !response.isResponseAsync()) {
                  ActiveMQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace"));
               }

               if (closed) {
                  break;
               }

               final long now = System.currentTimeMillis();

               toWait -= now - start;

               start = now;
            }

            if (closed && toWait > 0 && response == null) {
               Throwable cause = ActiveMQClientMessageBundle.BUNDLE.connectionDestroyed();
               throw ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause);
            }

            if (response == null) {
               ActiveMQException e = ActiveMQClientMessageBundle.BUNDLE.timedOutSendingPacket(connection.getBlockingCallTimeout(), packet.getType());
               connection.asyncFail(e);
               throw e;
            }

            if (response.getType() == PacketImpl.EXCEPTION) {
               final ActiveMQExceptionMessage mem = (ActiveMQExceptionMessage) response;

               ActiveMQException e = mem.getException();

               e.fillInStackTrace();

               throw e;
            }
         } finally {
            lock.unlock();
         }

         return response;
      }
   }