public void process()

in components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java [136:358]


    public void process(Exchange exchange) throws MllpException {
        lock.lock();
        try {
            log.trace("process({}) [{}] - entering", exchange.getExchangeId(), socket);
            getEndpoint().updateLastConnectionActivityTicks();

            Message message = exchange.getMessage();

            getEndpoint().checkBeforeSendProperties(exchange, socket, log);

            // Establish a connection if needed
            try {
                checkConnection();

                if (cachedLocalAddress != null) {
                    message.setHeader(MllpConstants.MLLP_LOCAL_ADDRESS, cachedLocalAddress);
                }

                if (cachedRemoteAddress != null) {
                    message.setHeader(MllpConstants.MLLP_REMOTE_ADDRESS, cachedRemoteAddress);
                }

                // Send the message to the external system
                byte[] hl7MessageBytes = null;
                Object messageBody = message.getBody();
                if (messageBody == null) {
                    String exceptionMessage
                            = String.format("process(%s) [%s] - message body is null", exchange.getExchangeId(), socket);
                    exchange.setException(new MllpInvalidMessageException(exceptionMessage, hl7MessageBytes, logPhi));
                    return;
                } else if (messageBody instanceof byte[]) {
                    hl7MessageBytes = (byte[]) messageBody;
                } else if (messageBody instanceof String) {
                    String stringBody = (String) messageBody;
                    hl7MessageBytes = stringBody.getBytes(MllpCharsetHelper.getCharset(exchange, charset));
                    if (getConfiguration().hasCharsetName()) {
                        exchange.setProperty(ExchangePropertyKey.CHARSET_NAME, getConfiguration().getCharsetName());
                    }
                }

                log.debug("process({}) [{}] - sending message to external system", exchange.getExchangeId(), socket);

                try {
                    mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
                    mllpBuffer.writeTo(socket);
                } catch (MllpSocketException writeEx) {
                    // Connection may have been reset - try one more time
                    log.debug("process({}) [{}] - exception encountered writing payload - attempting reconnect",
                            exchange.getExchangeId(), socket, writeEx);
                    try {
                        checkConnection();
                        log.trace("process({}) [{}] - reconnected succeeded - resending payload", exchange.getExchangeId(),
                                socket);
                        try {
                            mllpBuffer.writeTo(socket);
                        } catch (MllpSocketException retryWriteEx) {
                            String exceptionMessage = String.format(
                                    "process(%s) [%s] - exception encountered attempting to write payload after reconnect",
                                    exchange.getExchangeId(), socket);
                            log.warn(exceptionMessage, retryWriteEx);
                            exchange.setException(
                                    new MllpWriteException(
                                            exceptionMessage, mllpBuffer.toByteArrayAndReset(), retryWriteEx, logPhi));
                        }
                    } catch (IOException reconnectEx) {
                        String exceptionMessage
                                = String.format("process(%s) [%s] - exception encountered attempting to reconnect",
                                        exchange.getExchangeId(), socket);
                        log.warn(exceptionMessage, reconnectEx);
                        exchange.setException(
                                new MllpWriteException(exceptionMessage, mllpBuffer.toByteArrayAndReset(), writeEx, logPhi));
                        mllpBuffer.resetSocket(socket);
                    }
                }
                if (getConfiguration().getExchangePattern() == ExchangePattern.InOnly) {
                    log.debug("process({}) [{}] - not checking acknowledgement from external system",
                            exchange.getExchangeId(), socket);
                    return;
                }
                if (exchange.getException() == null) {
                    log.debug("process({}) [{}] - reading acknowledgement from external system", exchange.getExchangeId(),
                            socket);
                    try {
                        mllpBuffer.reset();
                        mllpBuffer.readFrom(socket);
                    } catch (MllpSocketException receiveAckEx) {
                        // Connection may have been reset - try one more time
                        log.debug("process({}) [{}] - exception encountered reading acknowledgement - attempting reconnect",
                                exchange.getExchangeId(), socket, receiveAckEx);
                        try {
                            checkConnection();
                        } catch (IOException reconnectEx) {
                            String exceptionMessage = String.format(
                                    "process(%s) [%s] - exception encountered attempting to reconnect after acknowledgement read failure",
                                    exchange.getExchangeId(), socket);
                            log.warn(exceptionMessage, reconnectEx);
                            exchange.setException(
                                    new MllpAcknowledgementReceiveException(
                                            exceptionMessage, hl7MessageBytes, receiveAckEx, logPhi));
                            mllpBuffer.resetSocket(socket);
                        }

                        if (exchange.getException() == null) {
                            log.trace("process({}) [{}] - resending payload after successful reconnect",
                                    exchange.getExchangeId(),
                                    socket);
                            try {
                                mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
                                mllpBuffer.writeTo(socket);
                            } catch (MllpSocketException writeRetryEx) {
                                String exceptionMessage = String.format(
                                        "process(%s) [%s] - exception encountered attempting to write payload after read failure and successful reconnect",
                                        exchange.getExchangeId(), socket);
                                log.warn(exceptionMessage, writeRetryEx);
                                exchange.setException(
                                        new MllpWriteException(exceptionMessage, hl7MessageBytes, receiveAckEx, logPhi));
                            }

                            if (exchange.getException() == null) {
                                log.trace("process({}) [{}] - resend succeeded - reading acknowledgement from external system",
                                        exchange.getExchangeId(), socket);
                                try {
                                    mllpBuffer.reset();
                                    mllpBuffer.readFrom(socket);
                                } catch (MllpSocketException secondReceiveEx) {
                                    String exceptionMessageFormat = mllpBuffer.isEmpty()
                                            ? "process(%s) [%s] - exception encountered reading MLLP Acknowledgement after successful reconnect and resend"
                                            : "process(%s) [%s] - exception encountered reading complete MLLP Acknowledgement after successful reconnect and resend";
                                    String exceptionMessage
                                            = String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
                                    log.warn(exceptionMessage, secondReceiveEx);
                                    // Send the original exception to the exchange
                                    exchange.setException(new MllpAcknowledgementReceiveException(
                                            exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), receiveAckEx,
                                            logPhi));
                                } catch (SocketTimeoutException secondReadTimeoutEx) {
                                    String exceptionMessageFormat = mllpBuffer.isEmpty()
                                            ? "process(%s) [%s] - timeout receiving MLLP Acknowledgment after successful reconnect and resend"
                                            : "process(%s) [%s] - timeout receiving complete MLLP Acknowledgment after successful reconnect and resend";
                                    String exceptionMessage
                                            = String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
                                    log.warn(exceptionMessage, secondReadTimeoutEx);
                                    // Send the original exception to the exchange
                                    exchange.setException(new MllpAcknowledgementTimeoutException(
                                            exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), receiveAckEx,
                                            logPhi));
                                    mllpBuffer.resetSocket(socket);
                                }
                            }
                        }
                    } catch (SocketTimeoutException timeoutEx) {
                        String exceptionMessageFormat = mllpBuffer.isEmpty()
                                ? "process(%s) [%s] - timeout receiving MLLP Acknowledgment"
                                : "process(%s) [%s] - timeout receiving complete MLLP Acknowledgment";
                        String exceptionMessage = String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
                        log.warn(exceptionMessage, timeoutEx);
                        exchange.setException(new MllpAcknowledgementTimeoutException(
                                exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), timeoutEx, logPhi));
                        mllpBuffer.resetSocket(socket);
                    }

                    if (exchange.getException() == null) {
                        if (mllpBuffer.hasCompleteEnvelope()) {
                            byte[] acknowledgementBytes = mllpBuffer.toMllpPayload();

                            log.debug(
                                    "process({}) [{}] - populating message headers with the acknowledgement from the external system",
                                    exchange.getExchangeId(), socket);
                            message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
                            if (acknowledgementBytes != null && acknowledgementBytes.length > 0) {
                                message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, new String(
                                        acknowledgementBytes,
                                        MllpCharsetHelper.getCharset(exchange, acknowledgementBytes, hl7Util, charset)));
                            } else {
                                message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, "");
                            }

                            if (getConfiguration().isValidatePayload()) {
                                String exceptionMessage = hl7Util.generateInvalidPayloadExceptionMessage(acknowledgementBytes);
                                if (exceptionMessage != null) {
                                    exchange.setException(new MllpInvalidAcknowledgementException(
                                            exceptionMessage, hl7MessageBytes, acknowledgementBytes, logPhi));
                                }
                            }

                            if (exchange.getException() == null) {
                                log.debug("process({}) [{}] - processing the acknowledgement from the external system",
                                        exchange.getExchangeId(), socket);
                                try {
                                    message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE,
                                            processAcknowledgment(hl7MessageBytes, acknowledgementBytes));
                                } catch (MllpNegativeAcknowledgementException nackEx) {
                                    message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, nackEx.getAcknowledgmentType());
                                    exchange.setException(nackEx);
                                }

                                getEndpoint().checkAfterSendProperties(exchange, socket, log);
                            }
                        } else {
                            String exceptionMessage = String.format("process(%s) [%s] - invalid acknowledgement received",
                                    exchange.getExchangeId(), socket);
                            exchange.setException(new MllpInvalidAcknowledgementException(
                                    exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), logPhi));
                        }
                    }
                }

            } catch (IOException ioEx) {
                log.debug("process({}) [{}] - IOException encountered checking connection", exchange.getExchangeId(), socket,
                        ioEx);
                exchange.setException(ioEx);
                mllpBuffer.resetSocket(socket);
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                mllpBuffer.reset();
            }

            log.trace("process({}) [{}] - exiting", exchange.getExchangeId(), socket);
        } finally {
            lock.unlock();
        }
    }