in components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java [412:613]
void sendAcknowledgement(byte[] originalHl7MessageBytes, Exchange exchange, TcpSocketConsumerRunnable consumerRunnable) {
log.trace("sendAcknowledgement(originalHl7MessageBytes[{}], Exchange[{}], {}) - entering",
originalHl7MessageBytes == null ? -1 : originalHl7MessageBytes.length, exchange.getExchangeId(),
consumerRunnable.getSocket());
getEndpoint().checkBeforeSendProperties(exchange, consumerRunnable.getSocket(), log);
// Find the acknowledgement body
byte[] acknowledgementMessageBytes = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT, byte[].class);
if (acknowledgementMessageBytes == null) {
acknowledgementMessageBytes = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, byte[].class);
}
String acknowledgementMessageType = null;
if (null == acknowledgementMessageBytes) {
boolean autoAck = exchange.getProperty(MllpConstants.MLLP_AUTO_ACKNOWLEDGE, true, boolean.class);
if (!autoAck) {
if (getConfiguration().getExchangePattern() == ExchangePattern.InOut) {
final Object acknowledgementBytesProperty = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT);
Object acknowledgementStringProperty = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING);
final String exceptionMessage
= (acknowledgementBytesProperty == null && acknowledgementStringProperty == null)
? "Automatic Acknowledgement is disabled and the "
+ MllpConstants.MLLP_ACKNOWLEDGEMENT + " and " + MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING
+ " exchange properties are null"
: "Automatic Acknowledgement is disabled and neither the "
+ MllpConstants.MLLP_ACKNOWLEDGEMENT + "(type = "
+ getTypeOrNullString(acknowledgementBytesProperty) + ") nor the"
+ MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING + "(type = "
+ getTypeOrNullString(acknowledgementBytesProperty)
+ ") exchange properties can be converted to byte[]";
MllpInvalidAcknowledgementException invalidAckEx = new MllpInvalidAcknowledgementException(
exceptionMessage, originalHl7MessageBytes, acknowledgementMessageBytes, logPhi);
exchange.setProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION, invalidAckEx);
getExceptionHandler().handleException(invalidAckEx);
}
} else {
String acknowledgmentTypeProperty = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, String.class);
String msa3 = exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_MSA_TEXT, String.class);
Exception exchangeEx = exchange.getException();
try {
if (null == acknowledgmentTypeProperty) {
if (null == exchangeEx) {
acknowledgementMessageType = "AA";
} else {
acknowledgementMessageType = "AE";
if (msa3 == null || msa3.isEmpty()) {
msa3 = exchangeEx.getClass().getName();
}
}
} else {
switch (acknowledgmentTypeProperty) {
case "AA":
acknowledgementMessageType = "AA";
break;
case "AE":
acknowledgementMessageType = "AE";
if (exchangeEx != null && msa3 != null && msa3.isEmpty()) {
msa3 = exchangeEx.getClass().getName();
}
break;
case "AR":
acknowledgementMessageType = "AR";
if (exchangeEx != null && msa3 != null && msa3.isEmpty()) {
msa3 = exchangeEx.getClass().getName();
}
break;
default:
exchange.setException(new Hl7AcknowledgementGenerationException(
hl7Util,
"Unsupported acknowledgment type: " + acknowledgmentTypeProperty));
return;
}
}
hl7Util.generateAcknowledgementPayload(consumerRunnable.getMllpBuffer(), originalHl7MessageBytes,
acknowledgementMessageType, msa3);
} catch (MllpAcknowledgementGenerationException ackGenerationException) {
exchange.setProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION, ackGenerationException);
getExceptionHandler().handleException(ackGenerationException);
}
}
} else {
consumerRunnable.getMllpBuffer().setEnvelopedMessage(acknowledgementMessageBytes);
final byte bM = 77;
final byte bS = 83;
final byte bA = 65;
final byte bE = 69;
final byte bR = 82;
final byte fieldSeparator = originalHl7MessageBytes[3];
// Acknowledgment is specified in exchange property - determine the acknowledgement type
for (int i = 0; i < originalHl7MessageBytes.length; ++i) {
if (MllpProtocolConstants.SEGMENT_DELIMITER == i) {
if (i + 7 < originalHl7MessageBytes.length // Make sure we don't run off the end of the message
&& bM == originalHl7MessageBytes[i + 1] && bS == originalHl7MessageBytes[i + 2]
&& bA == originalHl7MessageBytes[i + 3] && fieldSeparator == originalHl7MessageBytes[i + 4]) {
if (fieldSeparator != originalHl7MessageBytes[i + 7]) {
log.warn("MSA-1 is longer than 2-bytes - ignoring trailing bytes");
}
// Found MSA - pull acknowledgement bytes
byte[] acknowledgmentTypeBytes = new byte[2];
acknowledgmentTypeBytes[0] = originalHl7MessageBytes[i + 5];
acknowledgmentTypeBytes[1] = originalHl7MessageBytes[i + 6];
try {
acknowledgementMessageType
= new String(acknowledgmentTypeBytes, ExchangeHelper.getCharsetName(exchange));
} catch (IOException ioEx) {
throw new RuntimeCamelException("Failed to convert acknowledgement message to string", ioEx);
}
// Verify it's a valid acknowledgement code
if (bA != acknowledgmentTypeBytes[0]) {
switch (acknowledgementMessageBytes[1]) {
case bA:
case bR:
case bE:
break;
default:
log.warn("Invalid acknowledgement type [{}] found in message - should be AA, AE or AR",
acknowledgementMessageType);
}
}
// if the MLLP_ACKNOWLEDGEMENT_TYPE property is set on the exchange, make sure it matches
String acknowledgementTypeProperty
= exchange.getProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, String.class);
if (null != acknowledgementTypeProperty
&& !acknowledgementTypeProperty.equals(acknowledgementMessageType)) {
log.warn("Acknowledgement type found in message [{}] does not match {} exchange property "
+ "value [{}] - using value found in message",
acknowledgementMessageType, MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE,
acknowledgementTypeProperty);
}
}
}
}
}
Message message = exchange.getMessage();
if (acknowledgementMessageType != null && !acknowledgementMessageType.isEmpty()) {
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_TYPE, acknowledgementMessageType);
}
Charset charset = MllpCharsetHelper.getCharset(exchange, this.charset);
// TODO: re-evaluate this - it seems that the MLLP buffer should be populated by now
if (consumerRunnable.getMllpBuffer().hasCompleteEnvelope()) {
// The mllpBuffer will be used if bufferWrites is set or if auto acknowledgement is used
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, consumerRunnable.getMllpBuffer().toMllpPayload());
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, consumerRunnable.getMllpBuffer().toHl7String(charset));
// Send the acknowledgement
if (log.isDebugEnabled()) {
log.debug("sendAcknowledgement(originalHl7MessageBytes[{}], Exchange[{}], {}) - Sending Acknowledgement: {}",
originalHl7MessageBytes == null ? -1 : originalHl7MessageBytes.length, exchange.getExchangeId(),
consumerRunnable.getSocket(),
consumerRunnable.getMllpBuffer().toPrintFriendlyHl7String());
}
try {
consumerRunnable.getMllpBuffer().writeTo(consumerRunnable.getSocket());
} catch (MllpSocketException acknowledgementDeliveryEx) {
Exception exchangeEx = new MllpAcknowledgementDeliveryException(
"Failure delivering acknowledgment", originalHl7MessageBytes, acknowledgementMessageBytes,
acknowledgementDeliveryEx, logPhi);
exchange.setProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION, acknowledgementDeliveryEx);
exchange.setException(exchangeEx);
} finally {
consumerRunnable.getMllpBuffer().reset();
}
} else if (acknowledgementMessageBytes != null && acknowledgementMessageBytes.length > 0) {
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT, acknowledgementMessageBytes);
String acknowledgementMessageString = new String(acknowledgementMessageBytes, charset);
message.setHeader(MllpConstants.MLLP_ACKNOWLEDGEMENT_STRING, acknowledgementMessageString);
// Send the acknowledgement
if (log.isDebugEnabled()) {
log.debug("sendAcknowledgement(originalHl7MessageBytes[{}], Exchange[{}], {}) - Sending Acknowledgement: {}",
originalHl7MessageBytes == null ? -1 : originalHl7MessageBytes.length, exchange.getExchangeId(),
consumerRunnable.getSocket(),
hl7Util.convertToPrintFriendlyString(acknowledgementMessageBytes));
}
try {
consumerRunnable.getMllpBuffer().setEnvelopedMessage(acknowledgementMessageBytes);
consumerRunnable.getMllpBuffer().writeTo(consumerRunnable.getSocket());
} catch (MllpSocketException acknowledgementDeliveryEx) {
Exception exchangeEx = new MllpAcknowledgementDeliveryException(
"Failure delivering acknowledgment", originalHl7MessageBytes, acknowledgementMessageBytes,
acknowledgementDeliveryEx, logPhi);
exchange.setProperty(MllpConstants.MLLP_ACKNOWLEDGEMENT_EXCEPTION, acknowledgementDeliveryEx);
exchange.setException(exchangeEx);
}
}
getEndpoint().checkAfterSendProperties(exchange, consumerRunnable.getSocket(), log);
}