public void start()

in uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java [200:394]


	public void start(DuccService service, String[] args) throws Exception {
		super.start(service, args);
        
		try {
			if ( args == null || args.length ==0 || args[0] == null || args[0].trim().length() == 0) {
				logger.warn("start", null, "Missing Deployment Descriptor - the JP Requires argument. Add DD for UIMA-AS job or AE descriptor for UIMA jobs");
                throw new RuntimeException("Missing Deployment Descriptor - the JP Requires argument. Add DD for UIMA-AS job or AE descriptor for UIMA jobs");
			}

			// If the JP thread count is defaulted the DD or pieces-parts job will deduce it.
			String jpThreadCount = System.getProperty(FlagsHelper.Name.JpThreadCount.pname());
			
			// this class implements resetInvestment method
			Method m = this.getClass().getDeclaredMethod("resetInvestment", String.class);
			// register this class and its method to handle investment reset
			service.registerInvestmentResetCallback(this, m);

			String processJmxUrl = super.getProcessJmxUrl();
			logger.info("start", null, "-Dducc.deploy.JpUniqueId=" +
			System.getProperty(IDuccUser.DashD.DUCC_ID_PROCESS_UNIQUE.value()) +
			" Environment Var:DUCC_PROCESS_UNIQUEID=" + 
			System.getProperty(IDuccUser.EnvironmentVariable.DUCC_PROCESS_UNIQUEID.value()));			
			
			// tell the agent that this process is initializing
			agent.notify(ProcessState.Initializing, processJmxUrl);
			
			try {
				executor = new ScheduledThreadPoolExecutor(1);
				executor.prestartAllCoreThreads();
				// Instantiate a UIMA AS jmx monitor to poll for status of the AE.
				// This monitor checks if the AE is initializing or ready.
				JmxAEProcessInitMonitor monitor = new JmxAEProcessInitMonitor(agent);
				/*
				 * This will execute the UimaAEJmxMonitor continuously for every 15
				 * seconds with an initial delay of 20 seconds. This monitor polls
				 * initialization status of AE deployed in UIMA AS.
				 */
				executor.scheduleAtFixedRate(monitor, 20, 30, TimeUnit.SECONDS);
                // the JobProcessConfiguration class already checked for 
				// existence of -DDucc.Job.Type
				String jobType = System.getProperty(FlagsHelper.Name.JpType.pname()); 

            	
				// Set the initialize args for the appropriate container 
				// Include the specified pipeline count ... if not defined the container will determine it
				// UIMA-5428 If the DD generated by the JD is not accessible revert to the one specified
				// by the user ... it will be converted to a temporary file by the JP
				String[] jpArgs;
        if ("uima-as".equals(jobType)) {
          uimaASJob = true;            // dd - deployment descriptor. Will use UIMA-AS
          if (!new File(args[0]).canRead()) {
            String userdd = FlagsHelper.getInstance().getJpDd();
            getLogger().info("start", null, "Replacing inaccessible DD "+args[0]+" by the user specified "+userdd);
            args[0] = userdd;
          }
          jpArgs = new String[] { "-dd", args[0], "-saxonURL", saxonJarPath, "-xslt", dd2SpringXslPath, "-t", jpThreadCount };
        } else if ("uima".equals(jobType)) {
          // aed - analysis engine descriptor. Will use UIMA core only
          jpArgs = new String[] { "-aed", args[0], "-t", jpThreadCount };
        } else if ("user".equals(jobType)) {
          jpArgs = args;
        } else {
          throw new RuntimeException(
                  "Unsupported JP deployment mode. Check a value provided for -D"
                          + FlagsHelper.Name.JpType.pname()
                          + ". Supported modes: [uima-as|uima|user]");
        }
				Properties props = new Properties();
				// Using java reflection, initialize instance of IProcessContainer
				Method initMethod = processorInstance.getClass().getSuperclass().
						getDeclaredMethod("initialize", Properties.class, String[].class);
				int scaleout = (Integer)initMethod.invoke(processorInstance, props, jpArgs);
				
				getLogger().info("start", null,"Ducc JP JobType="+jobType);
				httpClient = new DuccHttpClient(this);
				String jdURL = "";
				try {
					jdURL = System.getProperty(FlagsHelper.Name.JdURL.pname());
					// Test the connection and fail if unable to connect
					// Gets the url from the registry if not in the system properties
					httpClient.initialize(jdURL);
					logger.info("start", null,"The JP Connected To JD Using URL "+httpClient.getJdUrl());
				} catch( Exception ee ) {
					if ( ee.getCause() != null && ee instanceof java.net.ConnectException ) {
						logger.error("start", null, "JP Process Unable To Connect to the JD Using Provided URL:"+jdURL+" Unable to Continue - Shutting Down JP");
					}
					throw ee;
				}
                // Setup latch which will be used to determine if worker threads
				// initialized properly. The threads will not fetch WIs from the JD
				// until the latch is open (all threads complete initialization)
				threadReadyCount = new CountDownLatch(scaleout);
				// Setup Thread Factory 
				UimaServiceThreadFactory tf = new UimaServiceThreadFactory(Thread
						.currentThread().getThreadGroup());
				workerThreadCount = new CountDownLatch(scaleout); 
				// Setup Thread pool with thread count = scaleout
				tpe = Executors.newFixedThreadPool(scaleout, tf);

				// initialize http client's timeout
				//httpClient.setTimeout(timeout);
				
		    	getLogger().info("start", null, "Starting "+scaleout+" Process Threads - JMX Connect String:"+ processJmxUrl);
				
		    	// Create and start worker threads that pull Work Items from the JD
		    	Future<?>[] threadHandles = new Future<?>[scaleout];
				for (int j = 0; j < scaleout; j++) {
					threadHandles[j] = tpe.submit(new HttpWorkerThread(this, httpClient, processorInstance, workerThreadCount, threadReadyCount, transactionMap, maxFrameworkFailures));
				}
				// wait until all process threads initialize
				threadReadyCount.await();
                // if initialization was successful, tell the agent that the JP is running 
//				if ( !currentState.equals(ProcessState.FailedInitialization )) {
				if ( !isInTerminalState() ) {
					setState(ProcessState.Running, processJmxUrl);
					/*
			    	// pipelines deployed and initialized. This process is Ready
			    	currentState = ProcessState.Running;
					// Update agent with the most up-to-date state of the pipeline
					// all is well, so notify agent that this process is in Running state
					agent.notify(currentState, processJmxUrl);
					*/
					// Stop polling for AE state. All AEs have initialized. No need
					// to poll. 
					try {
						monitor.updateAgentWhenRunning();  // force final publication
						executor.shutdown();
					} catch( Exception ee) {
						logger.error("start", null,ee);
					}
				}
				for( Future<?> future : threadHandles ) {
					future.get();   // wait for each worker thread to exit run()
				}
		    } catch( Exception ee) {
	
		    	logger.error("start", null,ee);
		    	getLogger().info("start", null, ">>> Failed to Deploy UIMA Service. Check UIMA Log for Details");
/*
		    	currentState = ProcessState.FailedInitialization;
				agent.notify(ProcessState.FailedInitialization);
*/
		    	setState(ProcessState.FailedInitialization);
		    } finally {
				// Stop executor. It was only needed to poll AE initialization status.
				// Since deploy() completed
				// the UIMA AS service either succeeded initializing or it failed. In
				// either case we no longer
				// need to poll for initialization status
		    	if ( executor != null ) {
			    	executor.shutdownNow();
		    	}
		    	if ( tpe != null ) {
		    		tpe.shutdown();
		    		tpe.awaitTermination(0, TimeUnit.MILLISECONDS);
		    	}
		    	
		    	if ( workerThreadCount != null ) {
			    	workerThreadCount.await();
			    	
			    	// Determine if the process container requires thread affinity to AE instance.
			    	// If it does, the worker thread has already called stop() which in
			    	// turn called AE.destroy(). If the process container has no thread 
			    	// affinity, call stop() here to make sure the cleanup code shuts down
			    	// internal components.
			    	Method useThreadAffinityMethod = processorInstance.getClass().getDeclaredMethod("useThreadAffinity");	
					boolean useThreadAffinity =
							(Boolean)useThreadAffinityMethod.invoke(processorInstance);
					// if the container has thread affinity, the stop method must be
					// called by the same thread that called initialize() and process().
					// Such container's stop() is called in the Worker Thread.
					if ( !useThreadAffinity) {
						Method stopMethod = processorInstance.getClass().getSuperclass().getDeclaredMethod("stop");
						stopMethod.invoke(processorInstance);
					}
			    	
			    	
			    	// Stop process container
					//Method stopMethod = processorInstance.getClass().getDeclaredMethod("stop");
					//stopMethod.invoke(processorInstance);
		    	}
				stop();
		    }
		} catch( Exception e) {
			/*
			currentState = ProcessState.FailedInitialization;
			agent.notify(currentState);
			*/
			setState(ProcessState.FailedInitialization);

			logger.error("start", null,e);
			stop();
		} 

	}