in client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java [110:396]
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
boolean immediate, final long deliveryDelay) throws JMSException
{
AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
String routingKey = destination.getRoutingKey();
FieldTable headers = delegate.getContentHeaderProperties().getHeaders();
if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR &&
(destination.getSubject() != null
|| (headers != null && headers.get(QpidMessageProperties.QPID_SUBJECT) != null)))
{
if (headers.get(QpidMessageProperties.QPID_SUBJECT) == null)
{
// use default subject in address string
headers.setString(QpidMessageProperties.QPID_SUBJECT, destination.getSubject());
}
if (destination.getAddressType() == AMQDestination.TOPIC_TYPE)
{
routingKey = headers.getString(QpidMessageProperties.QPID_SUBJECT);
}
}
BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(),
destination.getExchangeName(),
routingKey,
mandatory,
immediate);
AMQFrame publishFrame = body.generateFrame(getChannelId());
message.prepareForSending();
ByteBuffer payload = message.getData();
contentHeaderProperties.setUserId(getUserID());
//Set the JMS_QPID_DESTTYPE for 0-8/9 messages
int type;
if (destination instanceof Topic)
{
type = AMQDestination.TOPIC_TYPE;
}
else if (destination instanceof Queue)
{
type = AMQDestination.QUEUE_TYPE;
}
else
{
type = AMQDestination.UNKNOWN_TYPE;
}
//Set JMS_QPID_DESTTYPE
delegate.getContentHeaderProperties()
.getHeaders()
.setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
long currentTime;
if (!isDisableTimestamps())
{
currentTime = System.currentTimeMillis();
contentHeaderProperties.setTimestamp(currentTime);
if (timeToLive > 0)
{
if (!SET_EXPIRATION_AS_TTL)
{
//default behaviour used by Qpid
contentHeaderProperties.setExpiration(currentTime + timeToLive);
}
else
{
//alternative behaviour for brokers interpreting the expiration header directly as a TTL.
contentHeaderProperties.setExpiration(timeToLive);
}
}
else
{
contentHeaderProperties.setExpiration(0);
}
}
else
{
currentTime = 0L;
}
if(deliveryDelay != 0L && headers.get(QpidMessageProperties.QPID_NOT_VALID_BEFORE) == null)
{
if(currentTime == 0L)
{
currentTime = System.currentTimeMillis();
}
headers.setLong(QpidMessageProperties.QPID_NOT_VALID_BEFORE, deliveryDelay+currentTime);
}
contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
contentHeaderProperties.setPriority((byte) priority);
int size = (payload != null) ? payload.remaining() : 0;
AMQFrame contentHeaderFrame;
final AMQFrame[] frames;
boolean encrypt = message.getBooleanProperty(MessageEncryptionHelper.ENCRYPT_HEADER) || destination.sendEncrypted();
if(encrypt)
{
MessageEncryptionHelper encryptionHelper = getSession().getMessageEncryptionHelper();
try
{
SecretKeySpec secretKey = encryptionHelper.createSecretKey();
contentHeaderProperties.getHeaders().remove(MessageEncryptionHelper.ENCRYPT_HEADER);
String recipientString = message.getStringProperty(MessageEncryptionHelper.ENCRYPT_RECIPIENTS_HEADER);
if(recipientString == null)
{
recipientString = destination.getEncryptedRecipients();
}
contentHeaderProperties.getHeaders().remove(MessageEncryptionHelper.ENCRYPT_RECIPIENTS_HEADER);
String unencryptedProperties = message.getStringProperty(MessageEncryptionHelper.UNENCRYPTED_PROPERTIES_HEADER);
contentHeaderProperties.getHeaders().remove(MessageEncryptionHelper.UNENCRYPTED_PROPERTIES_HEADER);
final int headerLength = contentHeaderProperties.getPropertyListSize() + 2;
byte[] unencryptedBytes = new byte[headerLength + size];
ByteBuffer output = ByteBuffer.wrap(unencryptedBytes);
output.putShort((short) (contentHeaderProperties.getPropertyFlags() & 0xffff));
contentHeaderProperties.writePropertyListPayload(output);
if (size != 0)
{
payload.get(unencryptedBytes, headerLength, payload.remaining());
}
byte[] ivbytes = encryptionHelper.getInitialisationVector();
byte[] encryptedBytes = encryptionHelper.encrypt(secretKey, unencryptedBytes, ivbytes);
payload = ByteBuffer.wrap(encryptedBytes);
if (recipientString == null)
{
throw new JMSException("When sending an encrypted message, recipients must be supplied");
}
String[] recipients = recipientString.split(";");
List<List<Object>> encryptedKeys = new ArrayList<>();
for(MessageEncryptionHelper.KeyTransportRecipientInfo info : encryptionHelper.getKeyTransportRecipientInfo(Arrays.asList(recipients), secretKey))
{
encryptedKeys.add(info.asList());
}
BasicContentHeaderProperties oldProps = contentHeaderProperties;
contentHeaderProperties = new BasicContentHeaderProperties(oldProps);
final FieldTable oldHeaders = oldProps.getHeaders();
final FieldTable newHeaders = contentHeaderProperties.getHeaders();
newHeaders.clear();
if(unencryptedProperties != null)
{
List<String> unencryptedPropertyNames = Arrays.asList(unencryptedProperties.split(" *; *"));
for (String propertyName : unencryptedPropertyNames)
{
if (oldHeaders.propertyExists(propertyName))
{
newHeaders.setObject(propertyName, oldHeaders.get(propertyName));
}
}
}
newHeaders.setObject(MessageEncryptionHelper.ENCRYPTED_KEYS_PROPERTY, encryptedKeys);
newHeaders.setString(MessageEncryptionHelper.ENCRYPTION_ALGORITHM_PROPERTY,
encryptionHelper.getMessageEncryptionCipherName());
newHeaders.setBytes(MessageEncryptionHelper.KEY_INIT_VECTOR_PROPERTY, ivbytes);
contentHeaderProperties.setContentType(Encrypted091MessageFactory.ENCRYPTED_0_9_1_CONTENT_TYPE);
size = encryptedBytes.length;
}
catch (GeneralSecurityException | IOException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException("Unexpected Exception while encrypting message"), e);
}
}
else
{
byte[] compressed;
if (size > getConnection().getMessageCompressionThresholdSize()
&& getConnection().getDelegate().isMessageCompressionSupported()
&& getConnection().isMessageCompressionDesired()
&& contentHeaderProperties.getEncoding() == null
&& (compressed = GZIPUtils.compressBufferToArray(payload)) != null)
{
contentHeaderProperties.setEncoding("gzip");
payload = ByteBuffer.wrap(compressed);
size = compressed.length;
}
}
final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
frames = new AMQFrame[2 + contentBodyFrameCount];
if (payload != null)
{
createContentBodies(payload, frames, 2, getChannelId());
}
contentHeaderFrame =
ContentHeaderBody.createAMQFrame(getChannelId(),
contentHeaderProperties, size);
if (getLogger().isDebugEnabled())
{
getLogger().debug("Sending " + (frames.length-2) + " content body frames to " + destination);
}
if (contentHeaderFrame.getSize() > getSession().getAMQConnection().getMaximumFrameSize())
{
throw new JMSException("Unable to send message as the headers are too large ("
+ contentHeaderFrame.getSize()
+ " bytes, but the maximum negotiated frame size is "
+ getSession().getAMQConnection().getMaximumFrameSize()
+ ").");
}
if (getLogger().isDebugEnabled())
{
getLogger().debug("Sending content header frame to " + destination);
}
frames[0] = publishFrame;
frames[1] = contentHeaderFrame;
final CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
try
{
getSession().checkFlowControl();
}
catch (InterruptedException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException(
"Interrupted while waiting for flow control to be removed"), e);
}
AMQConnectionDelegate_8_0 connectionDelegate80 = (AMQConnectionDelegate_8_0) (getConnection().getDelegate());
boolean useConfirms = getPublishMode() == PublishMode.SYNC_PUBLISH_ALL
&& (connectionDelegate80.isConfirmedPublishSupported()
|| (!getSession().isTransacted() && connectionDelegate80.isConfirmedPublishNonTransactionalSupported()));
AMQProtocolHandler protocolHandler = getConnection().getProtocolHandler();
if(!useConfirms)
{
protocolHandler.writeFrame(compositeFrame);
}
else
{
final PublishConfirmMessageListener frameListener = new PublishConfirmMessageListener(getChannelId(),
protocolHandler.getConnectionDetails());
try
{
protocolHandler.writeCommandFrameAndWaitForReply(compositeFrame,
frameListener);
if(frameListener.isRejected())
{
throw new JMSException("The message was not accepted by the server (e.g. because the address was no longer valid)");
}
}
catch (QpidException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException(e.getMessage()), e);
}
catch (FailoverException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException(
"Fail-over interrupted send. Status of the send is uncertain."), e);
}
}
}