public void messageTransfer()

in broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java [425:558]


    public void messageTransfer(ServerSession ssn, final MessageTransfer xfr)
    {
        try
        {
            if(ssn.blockingTimeoutExceeded())
            {
                getEventLogger(ssn).message(ChannelMessages.FLOW_CONTROL_IGNORED());

                ssn.close(ErrorCodes.MESSAGE_TOO_LARGE,
                          "Session flow control was requested, but not enforced by sender");
            }
            else if(xfr.getBodySize() > ssn.getConnection().getMaxMessageSize())
            {
                exception(ssn, xfr, ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED,
                          "Message size of " + xfr.getBodySize() + " greater than allowed maximum of " + ssn.getConnection().getMaxMessageSize());
            }
            else
            {
                final MessageDestination destination = getDestinationForMessage(ssn, xfr);

                final DeliveryProperties delvProps =
                        xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
                if (delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
                {
                    delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
                }

                final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);

                final NamedAddressSpace virtualHost = getAddressSpace(ssn);
                try
                {
                    ssn.getAMQPConnection().checkAuthorizedMessagePrincipal(getMessageUserId(xfr));
                    ssn.authorisePublish(destination, messageMetaData.getRoutingKey(), messageMetaData.isImmediate(), ssn
                            .getAMQPConnection().getLastReadTime());

                }
                catch (AccessControlException e)
                {
                    ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS;
                    exception(ssn, xfr, errorCode, e.getMessage());

                    return;
                }

                final MessageStore store = virtualHost.getMessageStore();
                final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
                final MessageTransferMessage message =
                        new MessageTransferMessage(storeMessage, ssn.getReference());
                MessageReference<MessageTransferMessage> reference = message.newReference();

                try
                {
                    final InstanceProperties instanceProperties = prop ->
                    {
                        switch (prop)
                        {
                            case EXPIRATION:
                                return message.getExpiration();
                            case IMMEDIATE:
                                return message.isImmediate();
                            case MANDATORY:
                                return (delvProps == null || !delvProps.getDiscardUnroutable())
                                       && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
                            case PERSISTENT:
                                return message.isPersistent();
                            case REDELIVERED:
                                return delvProps.getRedelivered();
                        }
                        return null;
                    };

                    RoutingResult<MessageTransferMessage> routingResult = ssn.enqueue(message, instanceProperties, destination);

                    boolean explictlyRejected = routingResult.containsReject(RejectType.LIMIT_EXCEEDED);
                    if (!routingResult.hasRoutes() || explictlyRejected)
                    {
                        boolean closeWhenNoRoute = ssn.getAMQPConnection().getPort().getCloseWhenNoRoute();
                        boolean discardUnroutable = delvProps != null && delvProps.getDiscardUnroutable();
                        if (!discardUnroutable && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
                        {
                            RangeSet rejects = RangeSetFactory.createRangeSet();
                            rejects.add(xfr.getId());
                            MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
                            ssn.invoke(reject);
                        }
                        else if (!discardUnroutable && closeWhenNoRoute && explictlyRejected)
                        {
                            ExecutionErrorCode code = ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED;
                            String errorMessage = String.format("No route for message with destination '%s' and routing key '%s' : %s",
                                                                xfr.getDestination(),
                                                                message.getInitialRoutingAddress(),
                                                                routingResult.getRejectReason());

                            ExecutionException ex = new ExecutionException();
                            ex.setErrorCode(code);
                            ex.setDescription(errorMessage);
                            ssn.invoke(ex);
                            ssn.close(ErrorCodes.RESOURCE_ERROR, errorMessage);
                            return;
                        }
                        else
                        {
                            getEventLogger(ssn).message(ExchangeMessages.DISCARDMSG(destination.getName(),
                                                                                    messageMetaData.getRoutingKey()));
                        }
                    }

                    // TODO: we currently do not send MessageAccept when AcceptMode is EXPLICIT
                    if (ssn.isTransactional())
                    {
                        ssn.processed(xfr);
                    }
                    else
                    {
                        ssn.recordFuture(CompletableFuture.completedFuture(null),
                                         new CommandProcessedAction(ssn, xfr));
                    }
                }
                catch (VirtualHostUnavailableException e)
                {
                    getServerConnection(ssn).sendConnectionCloseAsync(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
                }
                finally
                {
                    reference.release();
                }
            }
        }
        finally
        {
            xfr.dispose();
        }
    }