in message-bridge/src/main/java/org/acme/message/bridge/MessageBridgeRoute.java [30:75]
public void configure() throws Exception {
rest()
.post("/message")
.id("rest")
.to("direct:publish");
from("direct:publish")
.id("ibmmq")
.transacted()
.log("Sending message to IBMMQ: ${body}")
.to("ibmmq:queue:{{ibm.mq.queue}}?disableReplyTo=true");
from("ibmmq:queue:{{ibm.mq.queue}}")
.id("ibmmq-amq")
.transacted()
.process(ex -> {
// Enlist our custom XAResource
// if the body contains "crash", this resource will kill the JVM during transaction commit
// this resource is then used to recover the message after the crash
DummyXAResource xaResource = new DummyXAResource(ex.getIn().getBody(String.class).contains("crash"));
transactionManager.getTransaction().enlistResource(xaResource);
})
.choice()
.when(simple("${header.JMSRedelivered}"))
.log("Redelivering message after rollback to ActiveMQ: ${body}")
.otherwise()
.log("Sending message from IBMMQ to ActiveMQ: ${body}")
.end()
.to("amq:queue:{{amq.queue}}")
.process(ex -> {
if (ex.getIn().getBody(String.class).toLowerCase().contains("rollback")) {
// To simulate the rollback just once, we examine the value of the JMSRedelivered flag in the message
// if the value is "false", we initiate the rollback
// if the value is "true", it indicates that the rollback has already occurred,
// so we allow the message to proceed through the route successfully
if (!ex.getIn().getHeader("JMSRedelivered", Boolean.class)) {
// Simulate rollback
throw new RuntimeException("Simulated rollback");
}
}
});
from("amq:queue:{{amq.queue}}")
.id("amq")
.log("ActiveMQ received: ${body}");
}