private void deliverCurrentMessageIfComplete()

in broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java [319:504]


    private void deliverCurrentMessageIfComplete()
    {
        // check and deliver if header says body length is zero
        if (_currentMessage.allContentReceived())
        {
            MessagePublishInfo info = _currentMessage.getMessagePublishInfo();
            String routingKey = AMQShortString.toString(info.getRoutingKey());
            String exchangeName = AMQShortString.toString(info.getExchange());

            try
            {
                final MessageDestination destination = _currentMessage.getDestination();

                ContentHeaderBody contentHeader = _currentMessage.getContentHeader();
                _connection.checkAuthorizedMessagePrincipal(AMQShortString.toString(contentHeader.getProperties().getUserId()));

                _publishAuthCache.authorisePublish(destination, routingKey, info.isImmediate(), _connection.getLastReadTime());

                if (_confirmOnPublish)
                {
                    _confirmedMessageCounter++;
                }

                long bodySize = _currentMessage.getSize();
                try
                {

                    final MessageMetaData messageMetaData =
                            new MessageMetaData(info,
                                                contentHeader,
                                                getConnection().getLastReadTime());

                    final MessageHandle<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
                    int bodyCount = _currentMessage.getBodyCount();
                    if (bodyCount > 0)
                    {
                        for (int i = 0; i < bodyCount; i++)
                        {
                            ContentBody contentChunk = _currentMessage.getContentChunk(i);
                            handle.addContent(contentChunk.getPayload());
                            contentChunk.dispose();
                        }
                    }
                    final StoredMessage<MessageMetaData> storedMessage = handle.allContentAdded();

                    final AMQMessage amqMessage = new AMQMessage(storedMessage, _connection.getReference());
                    try (MessageReference reference = amqMessage.newReference())
                    {

                        _currentMessage = null;


                        final InstanceProperties instanceProperties = prop ->
                        {
                            switch (prop)
                            {
                                case EXPIRATION:
                                    return amqMessage.getExpiration();
                                case IMMEDIATE:
                                    return amqMessage.isImmediate();
                                case PERSISTENT:
                                    return amqMessage.isPersistent();
                                case MANDATORY:
                                    return amqMessage.isMandatory();
                                case REDELIVERED:
                                    return false;
                            }
                            return null;
                        };

                        final RoutingResult<AMQMessage> result =
                                destination.route(amqMessage,
                                                  amqMessage.getInitialRoutingAddress(),
                                                  instanceProperties);

                        int enqueues = result.send(_transaction, amqMessage.isImmediate() ? _immediateAction : null);
                        if (enqueues == 0)
                        {
                            boolean mandatory = amqMessage.isMandatory();

                            boolean closeOnNoRoute = _connection.isCloseWhenNoRoute();
                            if (LOGGER.isDebugEnabled())
                            {
                                LOGGER.debug("Unroutable message exchange='{}', routing key='{}', mandatory={},"
                                        + " transactionalSession={}, closeOnNoRoute={}, confirmOnPublish={}",
                                        exchangeName,
                                        routingKey,
                                        mandatory,
                                        isTransactional(),
                                        closeOnNoRoute,
                                        _confirmOnPublish);
                            }

                            int errorCode = ErrorCodes.NO_ROUTE;
                            String errorMessage = String.format("No route for message with exchange '%s' and routing key '%s'",
                                                                exchangeName,
                                                                routingKey);
                            if (result.containsReject(RejectType.LIMIT_EXCEEDED))
                            {
                                errorCode = ErrorCodes.RESOURCE_ERROR;
                                errorMessage = errorMessage + ":" + result.getRejectReason();
                            }

                            if (mandatory
                                && isTransactional()
                                && !_confirmOnPublish
                                && _connection.isCloseWhenNoRoute())
                            {
                                _connection.sendConnectionClose(errorCode, errorMessage, _channelId);
                            }
                            else
                            {
                                if (mandatory || amqMessage.isImmediate())
                                {
                                    if (_confirmOnPublish)
                                    {
                                        _connection.writeFrame(new AMQFrame(_channelId,
                                                                            new BasicNackBody(_confirmedMessageCounter,
                                                                                              false,
                                                                                              false)));
                                    }
                                    _transaction.addPostTransactionAction(new WriteReturnAction(errorCode,
                                                                                                errorMessage,
                                                                                                amqMessage));
                                }
                                else
                                {
                                    if (_confirmOnPublish)
                                    {
                                        _connection.writeFrame(new AMQFrame(_channelId,
                                                                            new BasicAckBody(_confirmedMessageCounter,
                                                                                             false)));
                                    }
                                    message(ExchangeMessages.DISCARDMSG(exchangeName, routingKey));
                                }
                            }
                        }
                        else
                        {
                            if (_confirmOnPublish)
                            {
                                recordFuture(CompletableFuture.completedFuture(null),
                                             new ServerTransaction.Action()
                                             {
                                                 private final long _deliveryTag = _confirmedMessageCounter;

                                                 @Override
                                                 public void postCommit()
                                                 {
                                                     BasicAckBody body = _connection.getMethodRegistry()
                                                                                    .createBasicAckBody(
                                                                                            _deliveryTag, false);
                                                     _connection.writeFrame(body.generateFrame(_channelId));
                                                 }

                                                 @Override
                                                 public void onRollback()
                                                 {
                                                     final BasicNackBody body = new BasicNackBody(_deliveryTag,
                                                                                                  false,
                                                                                                  false);
                                                     _connection.writeFrame(new AMQFrame(_channelId, body));
                                                 }
                                             });
                            }
                        }
                    }
                }
                finally
                {
                    registerMessageReceived(bodySize);
                    if (isTransactional())
                    {
                        registerTransactedMessageReceived();
                    }
                    _currentMessage = null;
                }
            }
            catch (AccessControlException e)
            {
                _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, e.getMessage(), getChannelId());
            }

        }

    }