in core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java [242:299]
public void init(Broker broker) throws JBIException {
LOGGER.debug(broker.getContainer().getName() + ": Initializing jca flow");
super.init(broker);
// Create and register endpoint listener
endpointListener = new EndpointAdapter() {
public void internalEndpointRegistered(EndpointEvent event) {
onInternalEndpointRegistered(event, true);
}
public void internalEndpointUnregistered(EndpointEvent event) {
onInternalEndpointUnregistered(event, true);
}
};
broker.getContainer().addListener(endpointListener);
// Create and register component listener
componentListener = new ComponentAdapter() {
public void componentStarted(ComponentEvent event) {
onComponentStarted(event);
}
public void componentStopped(ComponentEvent event) {
onComponentStopped(event);
}
};
broker.getContainer().addListener(componentListener);
try {
if (connectionFactory == null) {
connectionFactory = new ActiveMQConnectionFactory(jmsURL);
}
// Inbound connector
ActiveMQDestination dest = new ActiveMQQueue(INBOUND_PREFIX + broker.getContainer().getName());
containerConnector = new Connector(dest, this, true);
containerConnector.start();
// Outbound connector
ActiveMQResourceAdapter outboundRa = new ActiveMQResourceAdapter();
outboundRa.setConnectionFactory(connectionFactory);
//
// We need to explicitly set the server url unless we use the
// default jms url, so set it.
//
if (outboundRa.getInfo().getServerUrl() == null) {
LOGGER.info("ActiveMQResourceAdapter server url was null. Setting it to: " + jmsURL);
outboundRa.getInfo().setServerUrl(jmsURL);
}
ActiveMQManagedConnectionFactory mcf = new ActiveMQManagedConnectionFactory();
mcf.setResourceAdapter(outboundRa);
managedConnectionFactory = (ConnectionFactory) mcf.createConnectionFactory(getConnectionManager());
// Inbound broadcast
broadcastTopic = new ActiveMQTopic(broadcastDestinationName);
advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
} catch (Exception e) {
LOGGER.error("Failed to initialize JCAFlow", e);
throw new JBIException(e);
}
}