in uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java [1492:1598]
public void sendRequestForMetadataToRemoteDelegates() throws AsynchAEException {
synchronized(childControllerList) {
// Add a delay of 100ms before sending requests for metadata to remote delegates.
// This is done to give the broker enough time to 'finalize' creation of
// temp reply queues for each remote delegate. It's been observed (on MAC OS only) that AMQ
// broker QueueSession.createTemporaryQueue() call is not synchronous. Meaning,
// return from createTemporaryQueue() does not guarantee immediate availability
// of the temp queue. It seems like this operation is asynchronous, causing:
// "InvalidDestinationException: Cannot publish to a deleted Destination..."
// when a request is send quickly to a service before the broker 'finalizes'
// creation of the temp queue.
try {
childControllerList.wait(100);
} catch( InterruptedException e) {}
if ( childControllerList.size() > 0 ) {
for( AnalysisEngineController childController : childControllerList ) {
if (childController instanceof AggregateAnalysisEngineController) {
((AggregateAnalysisEngineController) childController)
.sendRequestForMetadataToRemoteDelegates();
}
}
}
}
Endpoint[] delegateEndpoints = new Endpoint[destinationMap.size()];
// First copy endpoints to an array so that we dont get Concurrent access problems
// in case an error handler needs to disable the endpoint.
Set keySet = destinationMap.keySet();
Iterator it = keySet.iterator();
int indx = 0;
while (it.hasNext()) {
delegateEndpoints[indx++] = (Endpoint) destinationMap.get((String) it.next());
}
// Now send GetMeta request to all remote delegates
for (int i = 0; !isStopped() && i < delegateEndpoints.length; i++) {
if (delegateEndpoints[i].isRemote()) {
delegateEndpoints[i].initialize();
delegateEndpoints[i].setController(this);
String key = lookUpDelegateKey(delegateEndpoints[i].getEndpoint());
if (key != null && destinationMap.containsKey(key)) {
Endpoint endpoint = ((Endpoint) destinationMap.get(key));
if (key != null && endpoint != null) {
ServiceInfo serviceInfo = endpoint.getServiceInfo();
PrimitiveServiceInfo pServiceInfo = new PrimitiveServiceInfo(endpoint.isCasMultiplier(), null);
pServiceInfo.setBrokerURL(serviceInfo.getBrokerURL());
pServiceInfo.setInputQueueName(serviceInfo.getInputQueueName());
if (endpoint.getDestination() != null) {
pServiceInfo.setReplyQueueName(endpoint.getDestination().toString());
}
pServiceInfo.setServiceKey(key);
pServiceInfo.setState(serviceInfo.getState());
pServiceInfo.setAnalysisEngineInstanceCount(1);
registerWithAgent(pServiceInfo, super.getManagementInterface().getJmxDomain()
+ super.jmxContext + ",r" + remoteIndex + "=" + key
+ " [Remote Uima EE Service],name=" + key + "_" + serviceInfo.getLabel());
ServicePerformance servicePerformance = new ServicePerformance();
// servicePerformance.setIdleTime(System.nanoTime());
servicePerformance.setRemoteDelegate();
servicePerformance.setReplyThreadCount(endpoint.getConcurrentReplyConsumers());
registerWithAgent(servicePerformance, super.getManagementInterface().getJmxDomain()
+ super.jmxContext + ",r" + remoteIndex + "=" + key
+ " [Remote Uima EE Service],name=" + key + "_" + servicePerformance.getLabel());
ServiceErrors serviceErrors = new ServiceErrors();
registerWithAgent(serviceErrors, super.getManagementInterface().getJmxDomain()
+ super.jmxContext + ",r" + remoteIndex + "=" + key
+ " [Remote Uima EE Service],name=" + key + "_" + serviceErrors.getLabel());
remoteIndex++;
serviceErrorMap.put(key, serviceErrors);
Object[] delegateStatsArray = new Object[] { pServiceInfo, servicePerformance,
serviceErrors };
delegateStatMap.put(key, delegateStatsArray);
}
// If the service has stopped dont bother doing anything else. The service
// may have been stopped because listener connection could not be established.
if (isStopped()) {
return;
}
if (delegateEndpoints[i].getStatus() == Endpoint.OK ) {
dispatchMetadataRequest(delegateEndpoints[i]);
}
}
}
else {
// collocated delegate
delegateEndpoints[i].initialize();
delegateEndpoints[i].setController(this);
delegateEndpoints[i].setWaitingForResponse(true);
try {
UimaMessage message = getTransport(delegateEndpoints[i].getEndpoint()).produceMessage(
AsynchAEMessage.GetMeta, AsynchAEMessage.Request, getName());
UimaTransport transport = getTransport(delegateEndpoints[i].getEndpoint());
transport.getUimaMessageDispatcher(delegateEndpoints[i].getEndpoint()).dispatch(message);
} catch (Exception e) {
throw new AsynchAEException(e);
}
}
}
}