public void sendMessage()

in modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/AMQPTransportSender.java [115:261]


    public void sendMessage(MessageContext msgCtx,
                            String targetEPR,
                            OutTransportInfo outTransportInfo) throws AxisFault {

        AMQPSender amqpSender;
        Integer hashKey = null;
        Map<String, String> params = null;
        String replyTo = null;
        AMQPTransportProducerTx tx;
        MessageContext replyMsgCtx = msgCtx.getOperationContext().getMessageContext(
                WSDL2Constants.MESSAGE_LABEL_IN);
        if (replyMsgCtx != null) {
            replyTo = (String) replyMsgCtx.getProperty(AMQPTransportConstant.PROPERTY_AMQP_REPLY_TO);
        }

        if (replyTo != null) {
            // this is a response for a request message(request/response semantic message)
            hashKey = replyTo.hashCode();
            params = new HashMap<String, String>();
            params.put(AMQPTransportConstant.PARAMETER_QUEUE_NAME, replyTo);

            String conFacName = (String) msgCtx.getOperationContext().
                    getMessageContext(WSDL2Constants.MESSAGE_LABEL_IN).
                    getProperty(AMQPTransportConstant.RESPONSE_CONNECTION_FACTORY_NAME);
            if (conFacName == null) {
                throw new AxisFault("A message was received with 'reply to' set. But no reply " +
                        "connection factory name found. Define the parameter '" +
                        AMQPTransportConstant.PARAMETER_RESPONSE_CONNECTION_FACTORY_NAME +
                        "' as a service parameter. This response message will be dropped!");
            } else {
                params.put(AMQPTransportConstant.PARAMETER_CONNECTION_FACTORY_NAME, conFacName);
            }
        } else {
            // this is a normal one way out message
            if (targetEPR != null) {
                hashKey = new Integer(targetEPR.hashCode());
                try {
                    params = AMQPTransportUtils.parseAMQPUri(targetEPR);
                } catch (AMQPTransportException e) {
                    throw new AxisFault("Error while parsing the AMQP epr '" + targetEPR + "'", e);
                }
            } else if (outTransportInfo != null && outTransportInfo instanceof AMQPOutTransportInfo) {
                AMQPOutTransportInfo info = (AMQPOutTransportInfo) outTransportInfo;
                params = info.getParams();

            } else {
                throw new AxisFault("Could not determine the endpoint information to deliver the message");
            }
        }

        if (cache.hit(hashKey)) {
            amqpSender = cache.get(hashKey);
        } else {
            try {
                amqpSender = AMQPSenderFactory.createAMQPSender(connectionFactoryManager, params);
                cache.add(hashKey, amqpSender);
            } catch (IOException e) {
                throw new AxisFault("Could not create the AMQP sender", e);
            }
        }

        try {
            String correlationId = (String)
                    msgCtx.getProperty(AMQPTransportConstant.PROPERTY_AMQP_CORRELATION_ID);
            if (correlationId == null) {
                correlationId = msgCtx.getMessageID();
            }

            boolean isInOut = waitForSynchronousResponse(msgCtx);
            Semaphore available = null;
            if (isInOut) {
                replyTo = (String) msgCtx.getProperty(
                        AMQPTransportConstant.PROPERTY_AMQP_REPLY_TO);
                if (replyTo == null) {
                    replyTo = UUID.randomUUID().toString();
                }
                available = new Semaphore(0, true);
                responseTracker.put(correlationId, available);
            }

            String useTx = (String) msgCtx.getProperty(AMQPTransportConstant.PROPERTY_PRODUCER_TX);

            if (AMQPTransportConstant.AMQP_USE_LWPC.equals(useTx)) {
                tx = new AMQPTransportProducerTx(true, amqpSender.getChannel());
            } else if (AMQPTransportConstant.AMQP_USE_TX.equals(useTx)) {
                tx = new AMQPTransportProducerTx(false, amqpSender.getChannel());
            } else {
                tx = null;
            }

            if (tx != null) {
                try {
                    tx.start();
                } catch (IOException e) {
                    throw new AxisFault("Error while initiation tx for message '" +
                            msgCtx.getMessageID() + "'", e);
                }
            }

            amqpSender.sendAMQPMessage(msgCtx, correlationId, replyTo);

            if (tx != null) {
                try {
                    tx.end();
                } catch (IOException e) {
                    throw new AxisFault("Error while terminating tx for message '" +
                            msgCtx.getMessageID() + "'", e);
                } catch (InterruptedException e) {
                    log.error("Error while terminating tx for message '" +
                            msgCtx.getMessageID() + "'", e);
                    Thread.currentThread().interrupt();
                }
            }

            if (isInOut) {
                // block and process the response
                new AMQPSimpleConsumerTask(
                        responseHandlingPool,
                        amqpSender.getChannel(),
                        replyTo,
                        responseTracker,
                        responseMessage).
                        consume();
                try {
                    available.tryAcquire(semaphoreTimeOut, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }

                responseTracker.remove(correlationId);
                AMQPTransportMessage msg = responseMessage.get(correlationId);
                if (msg != null) {
                    handleSyncResponse(msgCtx, msg, msg.getContentType());
                } else {
                    // we don't have a response yet, so send a fault to client
                    log.warn("The semaphore with id '" + correlationId + "' was time out while "
                            + "waiting for a response, sending a fault to client..");
                    sendFault(msgCtx,
                            new Exception("Times out occurs while waiting for a response"));
                }
            }
        } catch (AMQPTransportException e) {
            throw new AxisFault("Could not retrieve the connection factory information", e);
        } catch (IOException e) {
            throw new AxisFault("Could not produce the message into the destination", e);
        }
    }