void sendMessage()

in client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java [110:396]


    void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
                     UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
                     boolean immediate, final long deliveryDelay) throws JMSException
    {


        AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
        BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();

        String routingKey = destination.getRoutingKey();

        FieldTable headers = delegate.getContentHeaderProperties().getHeaders();

        if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR &&
            (destination.getSubject() != null
             || (headers != null && headers.get(QpidMessageProperties.QPID_SUBJECT) != null)))
        {

            if (headers.get(QpidMessageProperties.QPID_SUBJECT) == null)
            {
                // use default subject in address string
                headers.setString(QpidMessageProperties.QPID_SUBJECT, destination.getSubject());
            }

            if (destination.getAddressType() == AMQDestination.TOPIC_TYPE)
            {
                routingKey = headers.getString(QpidMessageProperties.QPID_SUBJECT);
            }
        }

        BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(),
                                                                                        destination.getExchangeName(),
                                                                                        routingKey,
                                                                                        mandatory,
                                                                                        immediate);

        AMQFrame publishFrame = body.generateFrame(getChannelId());

        message.prepareForSending();
        ByteBuffer payload = message.getData();

        contentHeaderProperties.setUserId(getUserID());

        //Set the JMS_QPID_DESTTYPE for 0-8/9 messages
        int type;
        if (destination instanceof Topic)
        {
            type = AMQDestination.TOPIC_TYPE;
        }
        else if (destination instanceof Queue)
        {
            type = AMQDestination.QUEUE_TYPE;
        }
        else
        {
            type = AMQDestination.UNKNOWN_TYPE;
        }

        //Set JMS_QPID_DESTTYPE
        delegate.getContentHeaderProperties()
                .getHeaders()
                .setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);

        long currentTime;
        if (!isDisableTimestamps())
        {

            currentTime = System.currentTimeMillis();
            contentHeaderProperties.setTimestamp(currentTime);

            if (timeToLive > 0)
            {
                if (!SET_EXPIRATION_AS_TTL)
                {
                    //default behaviour used by Qpid
                    contentHeaderProperties.setExpiration(currentTime + timeToLive);
                }
                else
                {
                    //alternative behaviour for brokers interpreting the expiration header directly as a TTL.
                    contentHeaderProperties.setExpiration(timeToLive);
                }
            }
            else
            {
                contentHeaderProperties.setExpiration(0);
            }
        }
        else
        {
            currentTime = 0L;
        }

        if(deliveryDelay != 0L && headers.get(QpidMessageProperties.QPID_NOT_VALID_BEFORE) == null)
        {
            if(currentTime == 0L)
            {
                currentTime = System.currentTimeMillis();
            }
            headers.setLong(QpidMessageProperties.QPID_NOT_VALID_BEFORE, deliveryDelay+currentTime);
        }


        contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
        contentHeaderProperties.setPriority((byte) priority);

        int size = (payload != null) ? payload.remaining() : 0;
        AMQFrame contentHeaderFrame;
        final AMQFrame[] frames;
        boolean encrypt = message.getBooleanProperty(MessageEncryptionHelper.ENCRYPT_HEADER) || destination.sendEncrypted();
        if(encrypt)
        {
            MessageEncryptionHelper encryptionHelper = getSession().getMessageEncryptionHelper();
            try
            {
                SecretKeySpec secretKey = encryptionHelper.createSecretKey();

                contentHeaderProperties.getHeaders().remove(MessageEncryptionHelper.ENCRYPT_HEADER);

                String recipientString = message.getStringProperty(MessageEncryptionHelper.ENCRYPT_RECIPIENTS_HEADER);
                if(recipientString == null)
                {
                    recipientString = destination.getEncryptedRecipients();
                }
                contentHeaderProperties.getHeaders().remove(MessageEncryptionHelper.ENCRYPT_RECIPIENTS_HEADER);

                String unencryptedProperties = message.getStringProperty(MessageEncryptionHelper.UNENCRYPTED_PROPERTIES_HEADER);
                contentHeaderProperties.getHeaders().remove(MessageEncryptionHelper.UNENCRYPTED_PROPERTIES_HEADER);

                final int headerLength = contentHeaderProperties.getPropertyListSize() + 2;
                byte[] unencryptedBytes = new byte[headerLength + size];
                ByteBuffer output = ByteBuffer.wrap(unencryptedBytes);
                output.putShort((short) (contentHeaderProperties.getPropertyFlags() & 0xffff));
                contentHeaderProperties.writePropertyListPayload(output);

                if (size != 0)
                {
                    payload.get(unencryptedBytes, headerLength, payload.remaining());
                }

                byte[] ivbytes = encryptionHelper.getInitialisationVector();

                byte[] encryptedBytes = encryptionHelper.encrypt(secretKey, unencryptedBytes, ivbytes);
                payload = ByteBuffer.wrap(encryptedBytes);

                if (recipientString == null)
                {
                    throw new JMSException("When sending an encrypted message, recipients must be supplied");
                }
                String[] recipients = recipientString.split(";");
                List<List<Object>> encryptedKeys = new ArrayList<>();
                for(MessageEncryptionHelper.KeyTransportRecipientInfo info : encryptionHelper.getKeyTransportRecipientInfo(Arrays.asList(recipients), secretKey))
                {
                    encryptedKeys.add(info.asList());
                }

                BasicContentHeaderProperties oldProps = contentHeaderProperties;
                contentHeaderProperties = new BasicContentHeaderProperties(oldProps);
                final FieldTable oldHeaders = oldProps.getHeaders();
                final FieldTable newHeaders = contentHeaderProperties.getHeaders();
                newHeaders.clear();

                if(unencryptedProperties != null)
                {
                    List<String> unencryptedPropertyNames = Arrays.asList(unencryptedProperties.split(" *; *"));
                    for (String propertyName : unencryptedPropertyNames)
                    {
                        if (oldHeaders.propertyExists(propertyName))
                        {
                            newHeaders.setObject(propertyName, oldHeaders.get(propertyName));
                        }
                    }
                }

                newHeaders.setObject(MessageEncryptionHelper.ENCRYPTED_KEYS_PROPERTY, encryptedKeys);
                newHeaders.setString(MessageEncryptionHelper.ENCRYPTION_ALGORITHM_PROPERTY,
                                     encryptionHelper.getMessageEncryptionCipherName());
                newHeaders.setBytes(MessageEncryptionHelper.KEY_INIT_VECTOR_PROPERTY, ivbytes);
                contentHeaderProperties.setContentType(Encrypted091MessageFactory.ENCRYPTED_0_9_1_CONTENT_TYPE);
                size = encryptedBytes.length;

            }
            catch (GeneralSecurityException | IOException e)
            {
                throw JMSExceptionHelper.chainJMSException(new JMSException("Unexpected Exception while encrypting message"), e);
            }

        }
        else
        {
            byte[] compressed;
            if (size > getConnection().getMessageCompressionThresholdSize()
                && getConnection().getDelegate().isMessageCompressionSupported()
                && getConnection().isMessageCompressionDesired()
                && contentHeaderProperties.getEncoding() == null
                && (compressed = GZIPUtils.compressBufferToArray(payload)) != null)
            {
                contentHeaderProperties.setEncoding("gzip");
                payload = ByteBuffer.wrap(compressed);
                size = compressed.length;

            }
        }
        final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
        frames = new AMQFrame[2 + contentBodyFrameCount];

        if (payload != null)
        {
            createContentBodies(payload, frames, 2, getChannelId());
        }

        contentHeaderFrame =
                ContentHeaderBody.createAMQFrame(getChannelId(),
                                                 contentHeaderProperties, size);


        if (getLogger().isDebugEnabled())
        {
            getLogger().debug("Sending " + (frames.length-2) + " content body frames to " + destination);
        }

        if (contentHeaderFrame.getSize() > getSession().getAMQConnection().getMaximumFrameSize())
        {
            throw new JMSException("Unable to send message as the headers are too large ("
                                   + contentHeaderFrame.getSize()
                                   + " bytes, but the maximum negotiated frame size is "
                                   + getSession().getAMQConnection().getMaximumFrameSize()
                                   + ").");
        }
        if (getLogger().isDebugEnabled())
        {
            getLogger().debug("Sending content header frame to " + destination);
        }


        frames[0] = publishFrame;
        frames[1] = contentHeaderFrame;
        final CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);

        try
        {
            getSession().checkFlowControl();
        }
        catch (InterruptedException e)
        {
            throw JMSExceptionHelper.chainJMSException(new JMSException(
                    "Interrupted while waiting for flow control to be removed"), e);
        }

        AMQConnectionDelegate_8_0 connectionDelegate80 = (AMQConnectionDelegate_8_0) (getConnection().getDelegate());

        boolean useConfirms = getPublishMode() == PublishMode.SYNC_PUBLISH_ALL
                              && (connectionDelegate80.isConfirmedPublishSupported()
                               || (!getSession().isTransacted() && connectionDelegate80.isConfirmedPublishNonTransactionalSupported()));

        AMQProtocolHandler protocolHandler = getConnection().getProtocolHandler();
        if(!useConfirms)
        {
            protocolHandler.writeFrame(compositeFrame);
        }
        else
        {
            final PublishConfirmMessageListener frameListener = new PublishConfirmMessageListener(getChannelId(),
                                                                                                  protocolHandler.getConnectionDetails());
            try
            {

                protocolHandler.writeCommandFrameAndWaitForReply(compositeFrame,
                                                                                      frameListener);

                if(frameListener.isRejected())
                {
                    throw new JMSException("The message was not accepted by the server (e.g. because the address was no longer valid)");
                }
            }
            catch (QpidException e)
            {
                throw JMSExceptionHelper.chainJMSException(new JMSException(e.getMessage()), e);
            }
            catch (FailoverException e)
            {
                throw JMSExceptionHelper.chainJMSException(new JMSException(
                        "Fail-over interrupted send. Status of the send is uncertain."), e);

            }
        }
    }