public synchronized void initialize()

in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java [740:943]


  public synchronized void initialize(Map anApplicationContext)
          throws ResourceInitializationException {
		// By default UIMA-AS registers a shutdown hook to cleanup and stop the process.
		// To disable shutdown hook, an application should define 
		// AsynchAEMessage.DisableShutdownHook property.
	if ( Objects.isNull(System.getProperty(AsynchAEMessage.DisableShutdownHook))  ) {
	    // Add ShutdownHook to make sure the connection to the
	    // broker is always closed on process exit.
	    shutdownHookThread = new Thread(new UimaASShutdownHook(this));
	    Runtime.getRuntime().addShutdownHook(shutdownHookThread);
	} else {
		System.out.println("Application disabled UIMA-AS ShutdownHook");
	    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
	  	        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
	  	                "main", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
	  	                "UIMAJMS_disable_shutdown_hook__INFO");
        }
    }	
    // throws an exception if verions of UIMA-AS is not compatible with UIMA SDK
    VersionCompatibilityChecker.check(CLASS_NAME, "UIMA AS Client", "initialize");

    if (running) {
      throw new ResourceInitializationException(new UIMA_IllegalStateException());
    }
    reset();
    Properties performanceTuningSettings = null;

    if (!anApplicationContext.containsKey(UimaAsynchronousEngine.ServerUri)) {
      throw new ResourceInitializationException();
    }
    if (!anApplicationContext.containsKey(UimaAsynchronousEngine.ENDPOINT)) {
      throw new ResourceInitializationException();
    }
    ResourceManager rm = null;
    if (anApplicationContext.containsKey(Resource.PARAM_RESOURCE_MANAGER)) {
      rm = (ResourceManager) anApplicationContext.get(Resource.PARAM_RESOURCE_MANAGER);
    } else {
      rm = UIMAFramework.newDefaultResourceManager();
    }
    performanceTuningSettings = new Properties();
    if (anApplicationContext.containsKey(UIMAFramework.CAS_INITIAL_HEAP_SIZE)) {
      String cas_initial_heap_size = (String) anApplicationContext
              .get(UIMAFramework.CAS_INITIAL_HEAP_SIZE);
      performanceTuningSettings.put(UIMAFramework.CAS_INITIAL_HEAP_SIZE, cas_initial_heap_size);
    }
    asynchManager = new AsynchAECasManager_impl(rm);

    brokerURI = (String) anApplicationContext.get(UimaAsynchronousEngine.ServerUri);
    endpoint = (String) anApplicationContext.get(UimaAsynchronousEngine.ENDPOINT);
    
    //  Check if a placeholder is passed in instead of actual broker URL or endpoint. 
    //  The placeholder has the syntax ${placeholderName} and may be imbedded in text.
    //  A system property with placeholderName must exist for successful placeholder resolution.
    //  Throws ResourceInitializationException if placeholder is not in the System properties.
    brokerURI = replacePlaceholder(brokerURI); 
    endpoint = replacePlaceholder(endpoint); 

    clientSideJmxStats.setEndpointName(endpoint);
    int casPoolSize = 1;

    if (anApplicationContext.containsKey(UimaAsynchronousEngine.CasPoolSize)) {
      casPoolSize = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.CasPoolSize))
              .intValue();
      clientSideJmxStats.setCasPoolSize(casPoolSize);
    }
//    if ( anApplicationContext.containsKey(UimaAsynchronousEngine.TargetSelectorProperty) ) {
//        serviceTargetSelector = 
//        		(String)anApplicationContext.get(UimaAsynchronousEngine.TargetSelectorProperty);
//    }
    
    if (anApplicationContext.containsKey(UimaAsynchronousEngine.Timeout)) {
      processTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.Timeout))
              .intValue();
    }

    if (anApplicationContext.containsKey(UimaAsynchronousEngine.GetMetaTimeout)) {
      metadataTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.GetMetaTimeout))
              .intValue();
    }

    if (anApplicationContext.containsKey(UimaAsynchronousEngine.CpcTimeout)) {
      cpcTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.CpcTimeout))
              .intValue();
    }
    if (anApplicationContext.containsKey(UimaAsynchronousEngine.ApplicationName)) {
      applicationName = (String) anApplicationContext.get(UimaAsynchronousEngine.ApplicationName);
    }
    if (anApplicationContext.containsKey(UimaAsynchronousEngine.SERIALIZATION_STRATEGY)) {
      final String serializationStrategy = (String) anApplicationContext.get(UimaAsynchronousEngine.SERIALIZATION_STRATEGY);
      // change this to support compressed filitered as the default
      setSerialFormat((serializationStrategy.equalsIgnoreCase("xmi")) ? SerialFormat.XMI : SerialFormat.BINARY);
//      setSerialFormat((serializationStrategy.equalsIgnoreCase("xmi")) ? SerialFormat.XMI : SerialFormat.COMPRESSED_FILTERED);
      clientSideJmxStats.setSerialization(getSerialFormat());
    }
    if (anApplicationContext.containsKey(UimaAsynchronousEngine.userName)) {
        amqUser = (String) anApplicationContext
                .get(UimaAsynchronousEngine.userName);
    }
    if (anApplicationContext.containsKey(UimaAsynchronousEngine.password)) {
    	amqPassword = (String) anApplicationContext
    	.get(UimaAsynchronousEngine.password);
    }

    if (anApplicationContext.containsKey(UimaAsynchronousEngine.TimerPerCAS)) {
        timerPerCAS = ((Boolean) anApplicationContext.get(UimaAsynchronousEngine.TimerPerCAS))
                .booleanValue();
      }
    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
      UIMAFramework.getLogger(CLASS_NAME)
              .logrb(
                      Level.CONFIG,
                      CLASS_NAME.getName(),
                      "initialize",
                      JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                      "UIMAJMS_init_uimaee_client__CONFIG",
                      new Object[] { brokerURI, 0, casPoolSize, processTimeout, metadataTimeout,
                          cpcTimeout,timerPerCAS });
    }
    super.serviceDelegate = new ClientServiceDelegate(endpoint, applicationName, this);
    super.serviceDelegate.setCasProcessTimeout(processTimeout);
    super.serviceDelegate.setGetMetaTimeout(metadataTimeout);
    try {
      // Generate unique identifier
      String uuid = UUIDGenerator.generate();
      // JMX does not allow ':' in the ObjectName so replace these with underscore
      uuid = uuid.replaceAll(":", "_");
      uuid = uuid.replaceAll("-", "_");
      applicationName += "_" + uuid;
      jmxManager = new JmxManager("org.apache.uima");
      clientSideJmxStats.setApplicationName(applicationName);
      clientJmxObjectName = new ObjectName("org.apache.uima:name=" + applicationName);
      jmxManager.registerMBean(clientSideJmxStats, clientJmxObjectName);

      // Check if sharedConnection exists. If not create a new one. The sharedConnection
      // is static and shared by all instances of UIMA AS client in a jvm. The check
      // is made in a critical section by first acquiring a global static semaphore to
      // prevent a race condition.
      createSharedConnection(brokerURI);
      running = true;
      //  This is done to give the broker enough time to 'finalize' creation of
      //  temp reply queue. It's been observed (on MAC OS only) that AMQ
      //  broker QueueSession.createTemporaryQueue() call is not synchronous. Meaning,
      //  return from createTemporaryQueue() does not guarantee immediate availability
      //  of the temp queue. It seems like this operation is asynchronous, causing: 
      //  "InvalidDestinationException: Cannot publish to a deleted Destination..."
      //  on the service side when it tries to reply to the client.
      try {
        wait(100);
      } catch( InterruptedException e) {}
      sendMetaRequest();
      waitForMetadataReply();
      if (abort || !running) {
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                  "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_aborting_as_WARNING", new Object[] { "Metadata Timeout" });
        }
        throw new ResourceInitializationException(new UimaASMetaRequestTimeout());
      } else {
        if (collectionReader != null) {
          asynchManager.addMetadata(collectionReader.getProcessingResourceMetaData());
        }

        asynchManager.initialize(casPoolSize, "ApplicationCasPoolContext",
                performanceTuningSettings);

        // Create a special CasPool of size 1 to be used for deserializing CASes from a Cas
        // Multiplier
        if (super.resourceMetadata != null
                && super.resourceMetadata instanceof AnalysisEngineMetaData) {
          if (((AnalysisEngineMetaData) super.resourceMetadata).getOperationalProperties()
                  .getOutputsNewCASes()) {
            // Create a Shadow CAS Pool used to de-serialize CASes produced by a CAS Multiplier
            asynchManager.initialize(1, SHADOW_CAS_POOL, performanceTuningSettings);
          }
        }
        initialized = true;
        remoteService = true;
        // running = true;

        for (int i = 0; listeners != null && i < listeners.size(); i++) {
          ((UimaASStatusCallbackListener) listeners.get(i)).initializationComplete(null);
        }
      }

    } catch (ResourceInitializationException e) {
      state = ClientState.FAILED;
      notifyOnInitializationFailure(e);
      throw e;
    } catch (Exception e) {
      state = ClientState.FAILED;
      notifyOnInitializationFailure(e);
      throw new ResourceInitializationException(e);
    }
    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "initialize",
              JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_as_initialized__INFO",
              new Object[] { UimaAsynchronousEngine.SERIALIZATION_STRATEGY });
    }
    // Acquire cpcReady semaphore to block sending CPC request until
    // ALL outstanding CASes are received.
    super.acquireCpcReadySemaphore();
    state = ClientState.RUNNING;
  }