in broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java [319:504]
private void deliverCurrentMessageIfComplete()
{
// check and deliver if header says body length is zero
if (_currentMessage.allContentReceived())
{
MessagePublishInfo info = _currentMessage.getMessagePublishInfo();
String routingKey = AMQShortString.toString(info.getRoutingKey());
String exchangeName = AMQShortString.toString(info.getExchange());
try
{
final MessageDestination destination = _currentMessage.getDestination();
ContentHeaderBody contentHeader = _currentMessage.getContentHeader();
_connection.checkAuthorizedMessagePrincipal(AMQShortString.toString(contentHeader.getProperties().getUserId()));
_publishAuthCache.authorisePublish(destination, routingKey, info.isImmediate(), _connection.getLastReadTime());
if (_confirmOnPublish)
{
_confirmedMessageCounter++;
}
long bodySize = _currentMessage.getSize();
try
{
final MessageMetaData messageMetaData =
new MessageMetaData(info,
contentHeader,
getConnection().getLastReadTime());
final MessageHandle<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
int bodyCount = _currentMessage.getBodyCount();
if (bodyCount > 0)
{
for (int i = 0; i < bodyCount; i++)
{
ContentBody contentChunk = _currentMessage.getContentChunk(i);
handle.addContent(contentChunk.getPayload());
contentChunk.dispose();
}
}
final StoredMessage<MessageMetaData> storedMessage = handle.allContentAdded();
final AMQMessage amqMessage = new AMQMessage(storedMessage, _connection.getReference());
try (MessageReference reference = amqMessage.newReference())
{
_currentMessage = null;
final InstanceProperties instanceProperties = prop ->
{
switch (prop)
{
case EXPIRATION:
return amqMessage.getExpiration();
case IMMEDIATE:
return amqMessage.isImmediate();
case PERSISTENT:
return amqMessage.isPersistent();
case MANDATORY:
return amqMessage.isMandatory();
case REDELIVERED:
return false;
}
return null;
};
final RoutingResult<AMQMessage> result =
destination.route(amqMessage,
amqMessage.getInitialRoutingAddress(),
instanceProperties);
int enqueues = result.send(_transaction, amqMessage.isImmediate() ? _immediateAction : null);
if (enqueues == 0)
{
boolean mandatory = amqMessage.isMandatory();
boolean closeOnNoRoute = _connection.isCloseWhenNoRoute();
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Unroutable message exchange='{}', routing key='{}', mandatory={},"
+ " transactionalSession={}, closeOnNoRoute={}, confirmOnPublish={}",
exchangeName,
routingKey,
mandatory,
isTransactional(),
closeOnNoRoute,
_confirmOnPublish);
}
int errorCode = ErrorCodes.NO_ROUTE;
String errorMessage = String.format("No route for message with exchange '%s' and routing key '%s'",
exchangeName,
routingKey);
if (result.containsReject(RejectType.LIMIT_EXCEEDED))
{
errorCode = ErrorCodes.RESOURCE_ERROR;
errorMessage = errorMessage + ":" + result.getRejectReason();
}
if (mandatory
&& isTransactional()
&& !_confirmOnPublish
&& _connection.isCloseWhenNoRoute())
{
_connection.sendConnectionClose(errorCode, errorMessage, _channelId);
}
else
{
if (mandatory || amqMessage.isImmediate())
{
if (_confirmOnPublish)
{
_connection.writeFrame(new AMQFrame(_channelId,
new BasicNackBody(_confirmedMessageCounter,
false,
false)));
}
_transaction.addPostTransactionAction(new WriteReturnAction(errorCode,
errorMessage,
amqMessage));
}
else
{
if (_confirmOnPublish)
{
_connection.writeFrame(new AMQFrame(_channelId,
new BasicAckBody(_confirmedMessageCounter,
false)));
}
message(ExchangeMessages.DISCARDMSG(exchangeName, routingKey));
}
}
}
else
{
if (_confirmOnPublish)
{
recordFuture(CompletableFuture.completedFuture(null),
new ServerTransaction.Action()
{
private final long _deliveryTag = _confirmedMessageCounter;
@Override
public void postCommit()
{
BasicAckBody body = _connection.getMethodRegistry()
.createBasicAckBody(
_deliveryTag, false);
_connection.writeFrame(body.generateFrame(_channelId));
}
@Override
public void onRollback()
{
final BasicNackBody body = new BasicNackBody(_deliveryTag,
false,
false);
_connection.writeFrame(new AMQFrame(_channelId, body));
}
});
}
}
}
}
finally
{
registerMessageReceived(bodySize);
if (isTransactional())
{
registerTransactedMessageReceived();
}
_currentMessage = null;
}
}
catch (AccessControlException e)
{
_connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, e.getMessage(), getChannelId());
}
}
}