in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java [436:647]
private synchronized JmsEndpointConnection_impl getEndpointConnection(Endpoint anEndpoint)
throws AsynchAEException, ServiceShutdownException, ConnectException {
try {
controllerLatch.await();
} catch (InterruptedException e) {
}
JmsEndpointConnection_impl endpointConnection = null;
String brokerConnectionURL = null;
try {
connectionSemaphore.acquire();
// If sending a Free Cas Request to a remote Cas Multiplier always use the CM's
// broker
if ( anEndpoint.isFreeCasEndpoint() && anEndpoint.isCasMultiplier() && anEndpoint.isReplyEndpoint()) {
brokerConnectionURL = anEndpoint.getServerURI();
} else {
// If this is a reply to a client, use the same broker URL that manages this service input queue.
// Otherwise this is a request so use a broker specified in the endpoint object.
brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
}
String key = getLookupKey(anEndpoint);
String destination = getDestinationName(anEndpoint);
// First get a Map containing destinations managed by a broker provided by the client
BrokerConnectionEntry brokerConnectionEntry = null;
boolean startInactivityReaperTimer = false;
if (connectionMap.containsKey(brokerConnectionURL)) {
brokerConnectionEntry = (BrokerConnectionEntry) connectionMap.get(brokerConnectionURL);
// Findbugs thinks that the above may return null, perhaps due to a race condition. Add
// the null check just in case
if (brokerConnectionEntry == null) {
throw new AsynchAEException("Controller:"
+ getAnalysisEngineController().getComponentName()
+ " Unable to Lookup Broker Connection For URL:" + brokerConnectionURL);
}
brokerConnectionEntry.setBrokerURL(brokerConnectionURL);
if ( JmsEndpointConnection_impl.connectionClosedOrFailed(brokerConnectionEntry) ) {
brokerConnectionEntry.getConnectionTimer().cancelTimer();
invalidateConnectionAndEndpoints(brokerConnectionEntry);
brokerConnectionEntry = createConnectionEntry(brokerConnectionURL);
startInactivityReaperTimer = true;
// System.out.println(">>>>>> Connection Map Size:"+connectionMap.size());
}
} else {
brokerConnectionEntry = createConnectionEntry(brokerConnectionURL);
//System.out.println("---------------- New Broker "+brokerConnectionURL);
// System.out.println(">>>>>> Connection Map Size:"+connectionMap.size());
// long replyQueueInactivityTimeout = getInactivityTimeout(destination, brokerConnectionURL);
startInactivityReaperTimer = true;
}
if ( startInactivityReaperTimer ) {
long inactivityTimeout = getInactivityTimeout(destination, brokerConnectionURL);
brokerConnectionEntry.getConnectionTimer().setInactivityTimeout(inactivityTimeout);
// Start the FixedRate timer which wakes up at regular intervals defined by 'replyQueueInactivityTimeout'.
// The purpose is to find inactive jms sessions. All sessions found the be inactive will be
// closed.
brokerConnectionEntry.getConnectionTimer().startSessionReaperTimer(getAnalysisEngineController().getComponentName());
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"getEndpointConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_acquiring_connection_to_endpoint__FINE",
new Object[] { getAnalysisEngineController().getComponentName(), destination,
brokerConnectionURL });
}
// check the cache first
if (!brokerConnectionEntry.endpointExists(key)) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"getEndpointConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_create_new_connection__FINE",
new Object[] { getAnalysisEngineController().getComponentName(), destination,
brokerConnectionURL });
}
Endpoint masterEndpoint = null;
if ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController ) {
// Check if the endpoint has previously FAILED. It may have been marked as FAILED
// due to a temp queue listener shutdown caused by a Broker failure. In such case,
// before sending a message to a remote delegate we need to start a new instance
// of a listener which creates a new temp reply queue.
if ( !anEndpoint.isReplyEndpoint() ) { // this just means that we are not sending reply to a client
// The masterEndoint has the most current state. The 'anEndpoint' instance passed into
// this method is a clone from the master made at the begining of processing. The master endpoint
// may have been marked as FAILED after the clone was made
masterEndpoint = ((AggregateAnalysisEngineController) getAnalysisEngineController()).
lookUpEndpoint(anEndpoint.getDelegateKey(),false);
if ( masterEndpoint != null ) {
// Only one thread at a time is allowed here.
synchronized( masterEndpoint ) {
if ( masterEndpoint.getStatus() == Endpoint.FAILED ) {
String name = anEndpoint.getDestination().toString();
// Returns InputChannel if the Reply Listener for the delegate has previously failed.
// If the listener hasnt failed the getReplyInputChannel returns null
// InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDelegateKey());
InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDestination().toString());
if ( iC != null ) {
try {
// Create a new Listener, new Temp Queue and associate the listener with the Input Channel
// Also resets endpoint status to OK
iC.createListener(anEndpoint.getDelegateKey(), anEndpoint);
iC.removeDelegateFromFailedList(masterEndpoint.getDelegateKey());
} catch( Exception exx) {
throw new AsynchAEException(exx);
}
} else{
throw new AsynchAEException("Aggregate:"+getAnalysisEngineController()+" Has not yet recovered a listener for delegate: "+anEndpoint.getDelegateKey());
}
} else if ( !masterEndpoint.isFreeCasEndpoint() ) {
// In case this thread blocked while the reply queue listener was created, make sure
// that this endpoint uses the most up-date reply queue destination
anEndpoint.setDestination(masterEndpoint.getDestination());
}
}
}
}
}
endpointConnection = new JmsEndpointConnection_impl(brokerConnectionEntry, anEndpoint,
getAnalysisEngineController());
brokerConnectionEntry.addEndpointConnection(key, endpointConnection);
// long replyQueueInactivityTimeout = getInactivityTimeout(destination, brokerConnectionURL);
// brokerConnectionEntry.getConnectionTimer().setInactivityTimeout(replyQueueInactivityTimeout);
// // Start the FixedRate timer which wakes up at regular intervals defined by 'replyQueueInactivityTimeout'.
// // The purpose is to find inactive jms sessions. All sessions found the be inactive will be
// // closed.
// brokerConnectionEntry.getConnectionTimer().startSessionReaperTimer(getAnalysisEngineController().getComponentName());
// Connection is not in the cache, create a new connection, initialize it and cache it
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"getEndpointConnection", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_open_new_connection_to_endpoint__FINE",
new Object[] { getDestinationName(anEndpoint), brokerConnectionURL });
}
/**
* Open connection to a broker, create JMS session and MessageProducer
*/
endpointConnection.open();
brokerConnectionEntry.getConnectionTimer().setConnectionCreationTimestamp(
endpointConnection.connectionCreationTimestamp);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"getEndpointConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_connection_opened_to_endpoint__FINE",
new Object[] { getAnalysisEngineController().getComponentName(), destination,
brokerConnectionURL });
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"getEndpointConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_reusing_existing_connection__FINE",
new Object[] { getAnalysisEngineController().getComponentName(), destination,
brokerConnectionURL });
}
// Retrieve connection from the connection cache
endpointConnection = brokerConnectionEntry.getEndpointConnection(key);
// check the state of the connection and re-open it if necessary
if (endpointConnection != null && !endpointConnection.isOpen()) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"getEndpointConnection", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_connection_closed_reopening_endpoint__FINE",
new Object[] { destination });
}
endpointConnection.open();
if ( endpointConnection.isOpen()) {
brokerConnectionEntry.getConnectionTimer()
.setConnectionCreationTimestamp(System.nanoTime());
if ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController &&
anEndpoint.getDelegateKey() != null ) {
Endpoint masterEndpoint =
((AggregateAnalysisEngineController) getAnalysisEngineController()).lookUpEndpoint(
anEndpoint.getDelegateKey(), false);
masterEndpoint.setStatus(Endpoint.OK);
}
}
}
}
}
catch (InterruptedException e) {
} finally {
connectionSemaphore.release();
}
return endpointConnection;
}