in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java [740:943]
public synchronized void initialize(Map anApplicationContext)
throws ResourceInitializationException {
// By default UIMA-AS registers a shutdown hook to cleanup and stop the process.
// To disable shutdown hook, an application should define
// AsynchAEMessage.DisableShutdownHook property.
if ( Objects.isNull(System.getProperty(AsynchAEMessage.DisableShutdownHook)) ) {
// Add ShutdownHook to make sure the connection to the
// broker is always closed on process exit.
shutdownHookThread = new Thread(new UimaASShutdownHook(this));
Runtime.getRuntime().addShutdownHook(shutdownHookThread);
} else {
System.out.println("Application disabled UIMA-AS ShutdownHook");
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"main", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_disable_shutdown_hook__INFO");
}
}
// throws an exception if verions of UIMA-AS is not compatible with UIMA SDK
VersionCompatibilityChecker.check(CLASS_NAME, "UIMA AS Client", "initialize");
if (running) {
throw new ResourceInitializationException(new UIMA_IllegalStateException());
}
reset();
Properties performanceTuningSettings = null;
if (!anApplicationContext.containsKey(UimaAsynchronousEngine.ServerUri)) {
throw new ResourceInitializationException();
}
if (!anApplicationContext.containsKey(UimaAsynchronousEngine.ENDPOINT)) {
throw new ResourceInitializationException();
}
ResourceManager rm = null;
if (anApplicationContext.containsKey(Resource.PARAM_RESOURCE_MANAGER)) {
rm = (ResourceManager) anApplicationContext.get(Resource.PARAM_RESOURCE_MANAGER);
} else {
rm = UIMAFramework.newDefaultResourceManager();
}
performanceTuningSettings = new Properties();
if (anApplicationContext.containsKey(UIMAFramework.CAS_INITIAL_HEAP_SIZE)) {
String cas_initial_heap_size = (String) anApplicationContext
.get(UIMAFramework.CAS_INITIAL_HEAP_SIZE);
performanceTuningSettings.put(UIMAFramework.CAS_INITIAL_HEAP_SIZE, cas_initial_heap_size);
}
asynchManager = new AsynchAECasManager_impl(rm);
brokerURI = (String) anApplicationContext.get(UimaAsynchronousEngine.ServerUri);
endpoint = (String) anApplicationContext.get(UimaAsynchronousEngine.ENDPOINT);
// Check if a placeholder is passed in instead of actual broker URL or endpoint.
// The placeholder has the syntax ${placeholderName} and may be imbedded in text.
// A system property with placeholderName must exist for successful placeholder resolution.
// Throws ResourceInitializationException if placeholder is not in the System properties.
brokerURI = replacePlaceholder(brokerURI);
endpoint = replacePlaceholder(endpoint);
clientSideJmxStats.setEndpointName(endpoint);
int casPoolSize = 1;
if (anApplicationContext.containsKey(UimaAsynchronousEngine.CasPoolSize)) {
casPoolSize = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.CasPoolSize))
.intValue();
clientSideJmxStats.setCasPoolSize(casPoolSize);
}
// if ( anApplicationContext.containsKey(UimaAsynchronousEngine.TargetSelectorProperty) ) {
// serviceTargetSelector =
// (String)anApplicationContext.get(UimaAsynchronousEngine.TargetSelectorProperty);
// }
if (anApplicationContext.containsKey(UimaAsynchronousEngine.Timeout)) {
processTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.Timeout))
.intValue();
}
if (anApplicationContext.containsKey(UimaAsynchronousEngine.GetMetaTimeout)) {
metadataTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.GetMetaTimeout))
.intValue();
}
if (anApplicationContext.containsKey(UimaAsynchronousEngine.CpcTimeout)) {
cpcTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.CpcTimeout))
.intValue();
}
if (anApplicationContext.containsKey(UimaAsynchronousEngine.ApplicationName)) {
applicationName = (String) anApplicationContext.get(UimaAsynchronousEngine.ApplicationName);
}
if (anApplicationContext.containsKey(UimaAsynchronousEngine.SERIALIZATION_STRATEGY)) {
final String serializationStrategy = (String) anApplicationContext.get(UimaAsynchronousEngine.SERIALIZATION_STRATEGY);
// change this to support compressed filitered as the default
setSerialFormat((serializationStrategy.equalsIgnoreCase("xmi")) ? SerialFormat.XMI : SerialFormat.BINARY);
// setSerialFormat((serializationStrategy.equalsIgnoreCase("xmi")) ? SerialFormat.XMI : SerialFormat.COMPRESSED_FILTERED);
clientSideJmxStats.setSerialization(getSerialFormat());
}
if (anApplicationContext.containsKey(UimaAsynchronousEngine.userName)) {
amqUser = (String) anApplicationContext
.get(UimaAsynchronousEngine.userName);
}
if (anApplicationContext.containsKey(UimaAsynchronousEngine.password)) {
amqPassword = (String) anApplicationContext
.get(UimaAsynchronousEngine.password);
}
if (anApplicationContext.containsKey(UimaAsynchronousEngine.TimerPerCAS)) {
timerPerCAS = ((Boolean) anApplicationContext.get(UimaAsynchronousEngine.TimerPerCAS))
.booleanValue();
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(CLASS_NAME)
.logrb(
Level.CONFIG,
CLASS_NAME.getName(),
"initialize",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_init_uimaee_client__CONFIG",
new Object[] { brokerURI, 0, casPoolSize, processTimeout, metadataTimeout,
cpcTimeout,timerPerCAS });
}
super.serviceDelegate = new ClientServiceDelegate(endpoint, applicationName, this);
super.serviceDelegate.setCasProcessTimeout(processTimeout);
super.serviceDelegate.setGetMetaTimeout(metadataTimeout);
try {
// Generate unique identifier
String uuid = UUIDGenerator.generate();
// JMX does not allow ':' in the ObjectName so replace these with underscore
uuid = uuid.replaceAll(":", "_");
uuid = uuid.replaceAll("-", "_");
applicationName += "_" + uuid;
jmxManager = new JmxManager("org.apache.uima");
clientSideJmxStats.setApplicationName(applicationName);
clientJmxObjectName = new ObjectName("org.apache.uima:name=" + applicationName);
jmxManager.registerMBean(clientSideJmxStats, clientJmxObjectName);
// Check if sharedConnection exists. If not create a new one. The sharedConnection
// is static and shared by all instances of UIMA AS client in a jvm. The check
// is made in a critical section by first acquiring a global static semaphore to
// prevent a race condition.
createSharedConnection(brokerURI);
running = true;
// This is done to give the broker enough time to 'finalize' creation of
// temp reply queue. It's been observed (on MAC OS only) that AMQ
// broker QueueSession.createTemporaryQueue() call is not synchronous. Meaning,
// return from createTemporaryQueue() does not guarantee immediate availability
// of the temp queue. It seems like this operation is asynchronous, causing:
// "InvalidDestinationException: Cannot publish to a deleted Destination..."
// on the service side when it tries to reply to the client.
try {
wait(100);
} catch( InterruptedException e) {}
sendMetaRequest();
waitForMetadataReply();
if (abort || !running) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_aborting_as_WARNING", new Object[] { "Metadata Timeout" });
}
throw new ResourceInitializationException(new UimaASMetaRequestTimeout());
} else {
if (collectionReader != null) {
asynchManager.addMetadata(collectionReader.getProcessingResourceMetaData());
}
asynchManager.initialize(casPoolSize, "ApplicationCasPoolContext",
performanceTuningSettings);
// Create a special CasPool of size 1 to be used for deserializing CASes from a Cas
// Multiplier
if (super.resourceMetadata != null
&& super.resourceMetadata instanceof AnalysisEngineMetaData) {
if (((AnalysisEngineMetaData) super.resourceMetadata).getOperationalProperties()
.getOutputsNewCASes()) {
// Create a Shadow CAS Pool used to de-serialize CASes produced by a CAS Multiplier
asynchManager.initialize(1, SHADOW_CAS_POOL, performanceTuningSettings);
}
}
initialized = true;
remoteService = true;
// running = true;
for (int i = 0; listeners != null && i < listeners.size(); i++) {
((UimaASStatusCallbackListener) listeners.get(i)).initializationComplete(null);
}
}
} catch (ResourceInitializationException e) {
state = ClientState.FAILED;
notifyOnInitializationFailure(e);
throw e;
} catch (Exception e) {
state = ClientState.FAILED;
notifyOnInitializationFailure(e);
throw new ResourceInitializationException(e);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "initialize",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_as_initialized__INFO",
new Object[] { UimaAsynchronousEngine.SERIALIZATION_STRATEGY });
}
// Acquire cpcReady semaphore to block sending CPC request until
// ALL outstanding CASes are received.
super.acquireCpcReadySemaphore();
state = ClientState.RUNNING;
}