in broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java [425:558]
public void messageTransfer(ServerSession ssn, final MessageTransfer xfr)
{
try
{
if(ssn.blockingTimeoutExceeded())
{
getEventLogger(ssn).message(ChannelMessages.FLOW_CONTROL_IGNORED());
ssn.close(ErrorCodes.MESSAGE_TOO_LARGE,
"Session flow control was requested, but not enforced by sender");
}
else if(xfr.getBodySize() > ssn.getConnection().getMaxMessageSize())
{
exception(ssn, xfr, ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED,
"Message size of " + xfr.getBodySize() + " greater than allowed maximum of " + ssn.getConnection().getMaxMessageSize());
}
else
{
final MessageDestination destination = getDestinationForMessage(ssn, xfr);
final DeliveryProperties delvProps =
xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
if (delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
{
delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
}
final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
final NamedAddressSpace virtualHost = getAddressSpace(ssn);
try
{
ssn.getAMQPConnection().checkAuthorizedMessagePrincipal(getMessageUserId(xfr));
ssn.authorisePublish(destination, messageMetaData.getRoutingKey(), messageMetaData.isImmediate(), ssn
.getAMQPConnection().getLastReadTime());
}
catch (AccessControlException e)
{
ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS;
exception(ssn, xfr, errorCode, e.getMessage());
return;
}
final MessageStore store = virtualHost.getMessageStore();
final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
final MessageTransferMessage message =
new MessageTransferMessage(storeMessage, ssn.getReference());
MessageReference<MessageTransferMessage> reference = message.newReference();
try
{
final InstanceProperties instanceProperties = prop ->
{
switch (prop)
{
case EXPIRATION:
return message.getExpiration();
case IMMEDIATE:
return message.isImmediate();
case MANDATORY:
return (delvProps == null || !delvProps.getDiscardUnroutable())
&& xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
case PERSISTENT:
return message.isPersistent();
case REDELIVERED:
return delvProps.getRedelivered();
}
return null;
};
RoutingResult<MessageTransferMessage> routingResult = ssn.enqueue(message, instanceProperties, destination);
boolean explictlyRejected = routingResult.containsReject(RejectType.LIMIT_EXCEEDED);
if (!routingResult.hasRoutes() || explictlyRejected)
{
boolean closeWhenNoRoute = ssn.getAMQPConnection().getPort().getCloseWhenNoRoute();
boolean discardUnroutable = delvProps != null && delvProps.getDiscardUnroutable();
if (!discardUnroutable && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
RangeSet rejects = RangeSetFactory.createRangeSet();
rejects.add(xfr.getId());
MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
ssn.invoke(reject);
}
else if (!discardUnroutable && closeWhenNoRoute && explictlyRejected)
{
ExecutionErrorCode code = ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED;
String errorMessage = String.format("No route for message with destination '%s' and routing key '%s' : %s",
xfr.getDestination(),
message.getInitialRoutingAddress(),
routingResult.getRejectReason());
ExecutionException ex = new ExecutionException();
ex.setErrorCode(code);
ex.setDescription(errorMessage);
ssn.invoke(ex);
ssn.close(ErrorCodes.RESOURCE_ERROR, errorMessage);
return;
}
else
{
getEventLogger(ssn).message(ExchangeMessages.DISCARDMSG(destination.getName(),
messageMetaData.getRoutingKey()));
}
}
// TODO: we currently do not send MessageAccept when AcceptMode is EXPLICIT
if (ssn.isTransactional())
{
ssn.processed(xfr);
}
else
{
ssn.recordFuture(Futures.immediateFuture(null),
new CommandProcessedAction(ssn, xfr));
}
}
catch (VirtualHostUnavailableException e)
{
getServerConnection(ssn).sendConnectionCloseAsync(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
}
finally
{
reference.release();
}
}
}
finally
{
xfr.dispose();
}
}