in components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpClientProducer.java [136:358]
public void process(Exchange exchange) throws MllpException {
lock.lock();
try {
log.trace("process({}) [{}] - entering", exchange.getExchangeId(), socket);
getEndpoint().updateLastConnectionActivityTicks();
Message message = exchange.getMessage();
getEndpoint().checkBeforeSendProperties(exchange, socket, log);
// Establish a connection if needed
try {
checkConnection();
if (cachedLocalAddress != null) {
message.setHeader(MllpConstants.MLLP_LOCAL_ADDRESS, cachedLocalAddress);
}
if (cachedRemoteAddress != null) {
message.setHeader(MllpConstants.MLLP_REMOTE_ADDRESS, cachedRemoteAddress);
}
// Send the message to the external system
byte[] hl7MessageBytes = null;
Object messageBody = message.getBody();
if (messageBody == null) {
String exceptionMessage
= String.format("process(%s) [%s] - message body is null", exchange.getExchangeId(), socket);
exchange.setException(new MllpInvalidMessageException(exceptionMessage, hl7MessageBytes, logPhi));
return;
} else if (messageBody instanceof byte[]) {
hl7MessageBytes = (byte[]) messageBody;
} else if (messageBody instanceof String) {
String stringBody = (String) messageBody;
hl7MessageBytes = stringBody.getBytes(MllpCharsetHelper.getCharset(exchange, charset));
if (getConfiguration().hasCharsetName()) {
exchange.setProperty(ExchangePropertyKey.CHARSET_NAME, getConfiguration().getCharsetName());
}
}
log.debug("process({}) [{}] - sending message to external system", exchange.getExchangeId(), socket);
try {
mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
mllpBuffer.writeTo(socket);
} catch (MllpSocketException writeEx) {
// Connection may have been reset - try one more time
log.debug("process({}) [{}] - exception encountered writing payload - attempting reconnect",
exchange.getExchangeId(), socket, writeEx);
try {
checkConnection();
log.trace("process({}) [{}] - reconnected succeeded - resending payload", exchange.getExchangeId(),
socket);
try {
mllpBuffer.writeTo(socket);
} catch (MllpSocketException retryWriteEx) {
String exceptionMessage = String.format(
"process(%s) [%s] - exception encountered attempting to write payload after reconnect",
exchange.getExchangeId(), socket);
log.warn(exceptionMessage, retryWriteEx);
exchange.setException(
new MllpWriteException(
exceptionMessage, mllpBuffer.toByteArrayAndReset(), retryWriteEx, logPhi));
}
} catch (IOException reconnectEx) {
String exceptionMessage
= String.format("process(%s) [%s] - exception encountered attempting to reconnect",
exchange.getExchangeId(), socket);
log.warn(exceptionMessage, reconnectEx);
exchange.setException(
new MllpWriteException(exceptionMessage, mllpBuffer.toByteArrayAndReset(), writeEx, logPhi));
mllpBuffer.resetSocket(socket);
}
}
if (getConfiguration().getExchangePattern() == ExchangePattern.InOnly) {
log.debug("process({}) [{}] - not checking acknowledgement from external system",
exchange.getExchangeId(), socket);
return;
}
if (exchange.getException() == null) {
log.debug("process({}) [{}] - reading acknowledgement from external system", exchange.getExchangeId(),
socket);
try {
mllpBuffer.reset();
mllpBuffer.readFrom(socket);
} catch (MllpSocketException receiveAckEx) {
// Connection may have been reset - try one more time
log.debug("process({}) [{}] - exception encountered reading acknowledgement - attempting reconnect",
exchange.getExchangeId(), socket, receiveAckEx);
try {
checkConnection();
} catch (IOException reconnectEx) {
String exceptionMessage = String.format(
"process(%s) [%s] - exception encountered attempting to reconnect after acknowledgement read failure",
exchange.getExchangeId(), socket);
log.warn(exceptionMessage, reconnectEx);
exchange.setException(
new MllpAcknowledgementReceiveException(
exceptionMessage, hl7MessageBytes, receiveAckEx, logPhi));
mllpBuffer.resetSocket(socket);
}
if (exchange.getException() == null) {
log.trace("process({}) [{}] - resending payload after successful reconnect",
exchange.getExchangeId(),
socket);
try {
mllpBuffer.setEnvelopedMessage(hl7MessageBytes);
mllpBuffer.writeTo(socket);
} catch (MllpSocketException writeRetryEx) {
String exceptionMessage = String.format(
"process(%s) [%s] - exception encountered attempting to write payload after read failure and successful reconnect",
exchange.getExchangeId(), socket);
log.warn(exceptionMessage, writeRetryEx);
exchange.setException(
new MllpWriteException(exceptionMessage, hl7MessageBytes, receiveAckEx, logPhi));
}
if (exchange.getException() == null) {
log.trace("process({}) [{}] - resend succeeded - reading acknowledgement from external system",
exchange.getExchangeId(), socket);
try {
mllpBuffer.reset();
mllpBuffer.readFrom(socket);
} catch (MllpSocketException secondReceiveEx) {
String exceptionMessageFormat = mllpBuffer.isEmpty()
? "process(%s) [%s] - exception encountered reading MLLP Acknowledgement after successful reconnect and resend"
: "process(%s) [%s] - exception encountered reading complete MLLP Acknowledgement after successful reconnect and resend";
String exceptionMessage
= String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
log.warn(exceptionMessage, secondReceiveEx);
// Send the original exception to the exchange
exchange.setException(new MllpAcknowledgementReceiveException(
exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), receiveAckEx,
logPhi));
} catch (SocketTimeoutException secondReadTimeoutEx) {
String exceptionMessageFormat = mllpBuffer.isEmpty()
? "process(%s) [%s] - timeout receiving MLLP Acknowledgment after successful reconnect and resend"
: "process(%s) [%s] - timeout receiving complete MLLP Acknowledgment after successful reconnect and resend";
String exceptionMessage
= String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
log.warn(exceptionMessage, secondReadTimeoutEx);
// Send the original exception to the exchange
exchange.setException(new MllpAcknowledgementTimeoutException(
exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), receiveAckEx,
logPhi));
mllpBuffer.resetSocket(socket);
}
}
}
} catch (SocketTimeoutException timeoutEx) {
String exceptionMessageFormat = mllpBuffer.isEmpty()
? "process(%s) [%s] - timeout receiving MLLP Acknowledgment"
: "process(%s) [%s] - timeout receiving complete MLLP Acknowledgment";
String exceptionMessage = String.format(exceptionMessageFormat, exchange.getExchangeId(), socket);
log.warn(exceptionMessage, timeoutEx);
exchange.setException(new MllpAcknowledgementTimeoutException(
exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), timeoutEx, logPhi));
mllpBuffer.resetSocket(socket);
}
if (exchange.getException() == null) {
if (mllpBuffer.hasCompleteEnvelope()) {
byte[] acknowledgementBytes = mllpBuffer.toMllpPayload();
log.debug(
"process({}) [{}] - populating message headers with the acknowledgement from the external system",
exchange.getExchangeId(), socket);
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, acknowledgementBytes);
if (acknowledgementBytes != null && acknowledgementBytes.length > 0) {
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, new String(
acknowledgementBytes,
MllpCharsetHelper.getCharset(exchange, acknowledgementBytes, hl7Util, charset)));
} else {
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, "");
}
if (getConfiguration().isValidatePayload()) {
String exceptionMessage = hl7Util.generateInvalidPayloadExceptionMessage(acknowledgementBytes);
if (exceptionMessage != null) {
exchange.setException(new MllpInvalidAcknowledgementException(
exceptionMessage, hl7MessageBytes, acknowledgementBytes, logPhi));
}
}
if (exchange.getException() == null) {
log.debug("process({}) [{}] - processing the acknowledgement from the external system",
exchange.getExchangeId(), socket);
try {
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE,
processAcknowledgment(hl7MessageBytes, acknowledgementBytes));
} catch (MllpNegativeAcknowledgementException nackEx) {
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, nackEx.getAcknowledgmentType());
exchange.setException(nackEx);
}
getEndpoint().checkAfterSendProperties(exchange, socket, log);
}
} else {
String exceptionMessage = String.format("process(%s) [%s] - invalid acknowledgement received",
exchange.getExchangeId(), socket);
exchange.setException(new MllpInvalidAcknowledgementException(
exceptionMessage, hl7MessageBytes, mllpBuffer.toByteArrayAndReset(), logPhi));
}
}
}
} catch (IOException ioEx) {
log.debug("process({}) [{}] - IOException encountered checking connection", exchange.getExchangeId(), socket,
ioEx);
exchange.setException(ioEx);
mllpBuffer.resetSocket(socket);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
mllpBuffer.reset();
}
log.trace("process({}) [{}] - exiting", exchange.getExchangeId(), socket);
} finally {
lock.unlock();
}
}