public void run()

in uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java [141:486]


	public void run() {
		// when this thread looses connection to its JD, log error once
		boolean logConnectionToJD = true;
		HttpPost postMethod = null;
	    logger.info("HttpWorkerThread.run()", null, "Starting JP Process Thread Id:"+Thread.currentThread().getId());
	    Method processMethod = null;
	    Method getKeyMethod = null;
	    boolean error=false;
	    // ***** DEPLOY ANALYTICS ***********
	    // First, deploy analytics in a provided process container. Use java reflection to call
	    // deploy method. The process container has been instantiated in the main thread and
	    // loaded from ducc-user j      ar provided in system classpath
	    try {
			processMethod = processorInstance.getClass().getSuperclass().getDeclaredMethod("process", Object.class);	
			getKeyMethod = processorInstance.getClass().getSuperclass().getDeclaredMethod("getKey", String.class);	
			
			synchronized(HttpWorkerThread.class) {
				Method deployMethod = processorInstance.getClass().getSuperclass().getDeclaredMethod("deploy");
				deployMethod.invoke(processorInstance);
				logger.info("HttpWorkerThread.run()", null,".... Deployed Processing Container - Initialization Successful - Thread "+Thread.currentThread().getId());
			}

			// each thread needs its own PostMethod
			postMethod = new HttpPost(httpClient.getJdUrl());
			// Set request timeout
			//postMethod.getParams().setParameter(HttpMethodParams.SO_TIMEOUT, duccComponent.getTimeout());
	   	} catch( Throwable t) {
	   		error = true;
	   		synchronized(JobProcessComponent.class) {
	   			// send notification to an agent 
	   			duccComponent.setState(ProcessState.FailedInitialization);
			}
	   		logger.error("HttpWorkerThread.run()", null, t);
	   		logger.info("HttpWorkerThread.run()", null,"EXITING WorkThread ID:"
					+ Thread.currentThread().getId());
	   		logger.warn("HttpWorkerThread.run()", null, "The Job Process Terminating Due To Initialization Error");
			/* *****************************************/
			/* *****************************************/
			/* *****************************************/
        	/*       EXITING  PROCESS ON FIRST ERROR   */
			/* *****************************************/
	   		try {
	   			// allow agent some time to process FailedInitialization event 
	   			Thread.sleep(2000);
	   		} catch( Exception e) {}
	//   		System.exit(1);
			/* *****************************************/
			/* *****************************************/
			/* *****************************************/
			/* *****************************************/
	   		return;  // non-recovorable error
	   	} finally {
			// count down the latch. Once all threads deploy and initialize their analytics the processing
			// may being
			threadReadyCount.countDown();  // this thread is ready
			// **************************************************************************
			// now block and wait until all threads finish deploying and initializing 
			// analytics in provided process container. Processing begins when
			// all worker threads initialize their analytics.
			// **************************************************************************
			try {
				threadReadyCount.await();   // wait for all analytics to initialize
			} catch( Exception ie) {}

			if (!error) {
				synchronized(JobProcessComponent.class) {
					// change the state of this process and notify
					// Ducc agent that the process is ready and running
					duccComponent.setState(ProcessState.Running);
				}
			}
	   		
	   	}
			
			
	   	logger.info("HttpWorkerThread.run()", null, "Begin Processing Work Items - Thread Id:"+Thread.currentThread().getId());
		try {
			IMetaTaskTransaction transaction=null;
			int major = 0;
			int minor = 0;
			// Enter process loop. Stop this thread on the first process error.
			while (duccComponent.isRunning()) {  

				try {
					major = IdGenerator.addAndGet(1);
					minor = 0;
					// the getWork() may block if connection is lost. 
					transaction = getWork(postMethod, major, minor);
					// first check if we are still running
					if ( !duccComponent.isRunning() ) {
    					logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Process is Stopping - Terminating This Thread");
    					break;
					}
					if ( !logConnectionToJD ) {
						logConnectionToJD = true;   // reset flag in case we loose connection to JD in the future
						logger.info("run", null, "T["+Thread.currentThread().getId()+"] - Regained Connection to JD");
					}
                    
					// If the client did not provide a Work Item, reduce frequency of Get requests
					// by sleeping in between Get's. Synchronize so only one thread is polling for work

                    // if the JD did not provide a Work Item, most likely the CR is
					// done. In such case, reduce frequency of Get requests
					// by sleeping in between Get's. Eventually the OR will 
					// deallocate this process and the thread will exit
					if ( transaction.getMetaTask() == null || transaction.getMetaTask().getUserSpaceTask() == null) {
					  logger.info("run", null, "Client is out of work - will retry quietly every",duccComponent.getThreadSleepTime()/1000,"secs.");
    					
    			  // Retry at the start of this block as another thread may have just exited with work
    				// so the TAS (or JD) may now have a lot of work.	
 						synchronized (HttpWorkerThread.class) {
							while(duccComponent.isRunning() ) {
							  transaction = getWork(postMethod, major, ++minor);
							  if ( transaction.getMetaTask() != null && transaction.getMetaTask().getUserSpaceTask() != null ) {
							    logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" work flow has restarted");
							    break;
							  }
								waitAwhile(duccComponent.getThreadSleepTime());
							}
						}
						
					} 
					if ( duccComponent.isRunning()) {
						boolean workItemFailed = false;
						// process the Work item. Any exception here will cause the 
						// thread to terminate and also the JP to stop. The stopping
						// is orderly allowing each thread to finish processing of
						// the current WI. Once the JP notifies the Agent of a problem
						// the Agent will wait for 1 minute (default) before killing
						// this process via kill -9
						try {
							// To support investment reset we need to store transaction
							// object under a known key. This key is stored in the CAS.
							// In order to get to it, we need to deserialize the CAS 
							// in the user container. When an asynchronous investment
							// reset call is made from the user code, it will contain
							// that key to allow us to look up original transaction so that
							// we can send reset request to the JD.
							String key = (String)
									getKeyMethod.invoke(processorInstance, transaction.getMetaTask().getUserSpaceTask());
							if ( key != null ) {
								// add transaction under th
								transactionMap.put(key, transaction);
							}
							// make sure the JP is in the Running state before calling process()
							if ( !duccComponent.isRunning() ) {
								break;
							}
							//    ********** PROCESS() **************
							// using java reflection, call process to analyze the CAS. While 
							// we are blocking, user code may issue investment reset asynchronously.
							 List<Properties> metrics = (List<Properties>)processMethod.
							   invoke(processorInstance, transaction.getMetaTask().getUserSpaceTask());
							//    ***********************************
							if ( key != null ) {
                                // process ended we no longer expect investment reset from user
								// so remove transaction from the map
								transactionMap.remove(key);
							}
							
		                    logger.debug("run", null,"Thread:"+Thread.currentThread().getId()+" process() completed");
							//PerformanceMetrics metricsWrapper =
							//		new PerformanceMetrics();
						//	metricsWrapper.set(metrics);
							IServiceResultSerializer deserializer =
									new UimaResultDefaultSerializer();
							
							/*
							 				p.setProperty("name", metrics.getName());
				p.setProperty("uniqueName", metrics.getUniqueName());
				p.setProperty("analysisTime",
						String.valueOf(metrics.getAnalysisTime()));
				p.setProperty("numProcessed",
						String.valueOf(metrics.getNumProcessed()));

							 */
							List<PerformanceMetrics> pmList = new ArrayList<PerformanceMetrics>();
							for( Properties p : metrics) {
								PerformanceMetrics pm = 
										new PerformanceMetrics(p.getProperty("name"), p.getProperty("uniqueName"), Long.parseLong(p.getProperty("analysisTime")),0);
								pmList.add(pm);
							}
							
							transaction.getMetaTask().setPerformanceMetrics(deserializer.serialize(pmList));
							
						}  catch( InvocationTargetException ee) {
							
							logger.error("run", null, ee);
							// This is process error. It may contain user defined
							// exception in the stack trace. To protect against
						    // ClassNotFound, the entire stack trace was serialized.
							// Fetch the serialized stack trace and pass it on to
							// to the JD. The actual serialized stack trace is wrapped in
							// RuntimeException->AnalysisEngineException.message
							workItemFailed = true;
							// if WI processing fails while the service changes states to !Running
							// ignore results and terminate this thread.
							if ( !duccComponent.isRunning() ) {
								break;
							}
							IMetaTask mc = transaction.getMetaTask();
							//byte[] serializedException = null;
							Method getLastSerializedErrorMethod = processorInstance.getClass().getDeclaredMethod("getLastSerializedError");
							byte[] serializedException =
							    (byte[])getLastSerializedErrorMethod.invoke(processorInstance);
			
							mc.setUserSpaceException(serializedException);								

							logger.info("run", null, "Work item processing failed - returning serialized exception to the JD");
						} catch( Exception ee) {
							workItemFailed = true;
							// if WI processing fails while the service changes states to !Running
							// ignore results and terminate this thread.
							if ( !duccComponent.isRunning() ) {
								logger.info("run", null, "Work item processing failed - terminating thread - ignore any AE errors that may happen beyond this point");
								break;
							}
							// Serialize exception for the JD.
							byte[] serializedException = serializeException(ee);
							/*
							ByteArrayOutputStream baos = new ByteArrayOutputStream();
						    ObjectOutputStream oos = new ObjectOutputStream( baos );
						    oos.writeObject( ee);
						    oos.close();
							transaction.getMetaCas().setUserSpaceException(baos.toByteArray());
						    */
							logger.error("run", null, ee);
							transaction.getMetaTask().setUserSpaceException(serializedException);								
						}
						// Dont return serialized CAS to reduce the msg size
						transaction.getMetaTask().setUserSpaceTask(null);
						transaction.setType(Type.End);
						//String command = Type.End.name();
						
						minor++; // getWork()  
						TransactionId tid = new TransactionId(major, minor++);
						transaction.setTransactionId(tid);

						// if WI processing fails while the service changes states to !Running
						// ignore results and terminate this thread.
						if ( !duccComponent.isRunning() ) {
							break;
						}

						httpClient.execute(transaction, postMethod); // Work Item Processed - End
						// the execute() can block while recovering lost connection.
						// first check if we are still running.
						if ( !duccComponent.isRunning() ) {
	    					logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Process is Stopping - Terminating This Thread");
	    					break;
						}

						String wid = null;
                    	try {
                    		wid = transaction.getMetaTask().getSystemKey();
                    	} catch( Exception e) {
                    		
                    	}
	                    logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" sent END for WI:"+wid);
	                    if ( exitOnProcessFailure && workItemFailed ) {
	                        if ( wid != null ) {
		                    	logger.warn("run", null, "Worker thread exiting due to error while processing WI:"+wid);
	                        } else {
		                    	logger.warn("run", null, "Worker thread exiting due to error while processing a WI");
	                        }
	        				logger.info("run", null, "JP Terminating Due to WI Error - Notify Agent");

        					// send an update to the agent.
	                        duccComponent.setState(ProcessState.Stopping, ReasonForStoppingProcess.ExceededErrorThreshold.toString());
        					// sleep for awhile to let the agent handle 
	                        // Stopping event. 
	                        // Reason for the sleep: there may be a race condition
	                        // here, where the JP sends a Stopping event to 
	                        // its agent and immediately exits. Before the
	                        // agent finishes handling of Stopping event its
	                        // internal thread detects process termination 
	                        // and may mark the JP as croaked. Sleep should
	                        // reduce the risk of this race but there is still 
	                        // a chance that agent doesn't handle Stopping
	                        // event before it detects JP terminating. Unlikely
	                        // but theoretically possible.
	                        try {
    	                        Thread.sleep(3000);
        					} catch( InterruptedException e) {}
        					/* *****************************************/
        					/* *****************************************/
        					/* *****************************************/
                        	/*       EXITING  PROCESS ON FIRST ERROR   */
        					/* *****************************************/
        					logger.warn("run", null,"Terminating Job Process - Work Item Failed");

        					// Stop the JVM hard. 
        					Runtime.getRuntime().halt(-1);
        					/* *****************************************/
        					/* *****************************************/
        					/* *****************************************/
        					/* *****************************************/

                        	break;
                        }
	                    maxFrameworkFailures.set(maxFrameworkErrors);   // reset framework failures on success
					} 
				} catch( InterruptedException e) {
					logger.error("run", null, "WorkerThread Interrupted - Terminating Thread "+Thread.currentThread().getId());
					return;
				} catch (Exception e ) {
					logger.error("run", null, e);
					// If max framework error count has been reached 
					// just exit the process
					if ( maxFrameworkFailures.decrementAndGet() <= 0 ) {
						logger.error("run", null, "The Job Process Terminating Due To a Framework Error");
						Runtime.getRuntime().halt(-1);
					}
				} finally {

				}

			}

		} catch (Throwable t) {
			logger.error("run", null, t);
		} finally {
			logger.warn("run",null,"EXITING WorkThread ID:"
					+ Thread.currentThread().getId());
		    try {
		    	// Determine if the Worker thread has thread affinity to specific AE
		    	// instance. This depends on the process container. If this process
		    	// uses pieces part (not DD), than the thread should call stop on
		    	// process container which will than destroy the AE. User code may
		    	// store stuff in ThreadLocal and use it in the destroy method.
		    	Method useThreadAffinityMethod = processorInstance.getClass().getDeclaredMethod("useThreadAffinity");	
				boolean useThreadAffinity =
						(Boolean)useThreadAffinityMethod.invoke(processorInstance);
				if ( useThreadAffinity) {
					Method stopMethod = processorInstance.getClass().getSuperclass().getDeclaredMethod("stop");
					stopMethod.invoke(processorInstance);
				}
		   	} catch( Throwable t) {
		   		logger.warn("run",null,t);
		   	} finally {
				workerThreadCount.countDown();
		   	}
		
		}

	}