in uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java [200:394]
public void start(DuccService service, String[] args) throws Exception {
super.start(service, args);
try {
if ( args == null || args.length ==0 || args[0] == null || args[0].trim().length() == 0) {
logger.warn("start", null, "Missing Deployment Descriptor - the JP Requires argument. Add DD for UIMA-AS job or AE descriptor for UIMA jobs");
throw new RuntimeException("Missing Deployment Descriptor - the JP Requires argument. Add DD for UIMA-AS job or AE descriptor for UIMA jobs");
}
// If the JP thread count is defaulted the DD or pieces-parts job will deduce it.
String jpThreadCount = System.getProperty(FlagsHelper.Name.JpThreadCount.pname());
// this class implements resetInvestment method
Method m = this.getClass().getDeclaredMethod("resetInvestment", String.class);
// register this class and its method to handle investment reset
service.registerInvestmentResetCallback(this, m);
String processJmxUrl = super.getProcessJmxUrl();
logger.info("start", null, "-Dducc.deploy.JpUniqueId=" +
System.getProperty(IDuccUser.DashD.DUCC_ID_PROCESS_UNIQUE.value()) +
" Environment Var:DUCC_PROCESS_UNIQUEID=" +
System.getProperty(IDuccUser.EnvironmentVariable.DUCC_PROCESS_UNIQUEID.value()));
// tell the agent that this process is initializing
agent.notify(ProcessState.Initializing, processJmxUrl);
try {
executor = new ScheduledThreadPoolExecutor(1);
executor.prestartAllCoreThreads();
// Instantiate a UIMA AS jmx monitor to poll for status of the AE.
// This monitor checks if the AE is initializing or ready.
JmxAEProcessInitMonitor monitor = new JmxAEProcessInitMonitor(agent);
/*
* This will execute the UimaAEJmxMonitor continuously for every 15
* seconds with an initial delay of 20 seconds. This monitor polls
* initialization status of AE deployed in UIMA AS.
*/
executor.scheduleAtFixedRate(monitor, 20, 30, TimeUnit.SECONDS);
// the JobProcessConfiguration class already checked for
// existence of -DDucc.Job.Type
String jobType = System.getProperty(FlagsHelper.Name.JpType.pname());
// Set the initialize args for the appropriate container
// Include the specified pipeline count ... if not defined the container will determine it
// UIMA-5428 If the DD generated by the JD is not accessible revert to the one specified
// by the user ... it will be converted to a temporary file by the JP
String[] jpArgs;
if ("uima-as".equals(jobType)) {
uimaASJob = true; // dd - deployment descriptor. Will use UIMA-AS
if (!new File(args[0]).canRead()) {
String userdd = FlagsHelper.getInstance().getJpDd();
getLogger().info("start", null, "Replacing inaccessible DD "+args[0]+" by the user specified "+userdd);
args[0] = userdd;
}
jpArgs = new String[] { "-dd", args[0], "-saxonURL", saxonJarPath, "-xslt", dd2SpringXslPath, "-t", jpThreadCount };
} else if ("uima".equals(jobType)) {
// aed - analysis engine descriptor. Will use UIMA core only
jpArgs = new String[] { "-aed", args[0], "-t", jpThreadCount };
} else if ("user".equals(jobType)) {
jpArgs = args;
} else {
throw new RuntimeException(
"Unsupported JP deployment mode. Check a value provided for -D"
+ FlagsHelper.Name.JpType.pname()
+ ". Supported modes: [uima-as|uima|user]");
}
Properties props = new Properties();
// Using java reflection, initialize instance of IProcessContainer
Method initMethod = processorInstance.getClass().getSuperclass().
getDeclaredMethod("initialize", Properties.class, String[].class);
int scaleout = (Integer)initMethod.invoke(processorInstance, props, jpArgs);
getLogger().info("start", null,"Ducc JP JobType="+jobType);
httpClient = new DuccHttpClient(this);
String jdURL = "";
try {
jdURL = System.getProperty(FlagsHelper.Name.JdURL.pname());
// Test the connection and fail if unable to connect
// Gets the url from the registry if not in the system properties
httpClient.initialize(jdURL);
logger.info("start", null,"The JP Connected To JD Using URL "+httpClient.getJdUrl());
} catch( Exception ee ) {
if ( ee.getCause() != null && ee instanceof java.net.ConnectException ) {
logger.error("start", null, "JP Process Unable To Connect to the JD Using Provided URL:"+jdURL+" Unable to Continue - Shutting Down JP");
}
throw ee;
}
// Setup latch which will be used to determine if worker threads
// initialized properly. The threads will not fetch WIs from the JD
// until the latch is open (all threads complete initialization)
threadReadyCount = new CountDownLatch(scaleout);
// Setup Thread Factory
UimaServiceThreadFactory tf = new UimaServiceThreadFactory(Thread
.currentThread().getThreadGroup());
workerThreadCount = new CountDownLatch(scaleout);
// Setup Thread pool with thread count = scaleout
tpe = Executors.newFixedThreadPool(scaleout, tf);
// initialize http client's timeout
//httpClient.setTimeout(timeout);
getLogger().info("start", null, "Starting "+scaleout+" Process Threads - JMX Connect String:"+ processJmxUrl);
// Create and start worker threads that pull Work Items from the JD
Future<?>[] threadHandles = new Future<?>[scaleout];
for (int j = 0; j < scaleout; j++) {
threadHandles[j] = tpe.submit(new HttpWorkerThread(this, httpClient, processorInstance, workerThreadCount, threadReadyCount, transactionMap, maxFrameworkFailures));
}
// wait until all process threads initialize
threadReadyCount.await();
// if initialization was successful, tell the agent that the JP is running
// if ( !currentState.equals(ProcessState.FailedInitialization )) {
if ( !isInTerminalState() ) {
setState(ProcessState.Running, processJmxUrl);
/*
// pipelines deployed and initialized. This process is Ready
currentState = ProcessState.Running;
// Update agent with the most up-to-date state of the pipeline
// all is well, so notify agent that this process is in Running state
agent.notify(currentState, processJmxUrl);
*/
// Stop polling for AE state. All AEs have initialized. No need
// to poll.
try {
monitor.updateAgentWhenRunning(); // force final publication
executor.shutdown();
} catch( Exception ee) {
logger.error("start", null,ee);
}
}
for( Future<?> future : threadHandles ) {
future.get(); // wait for each worker thread to exit run()
}
} catch( Exception ee) {
logger.error("start", null,ee);
getLogger().info("start", null, ">>> Failed to Deploy UIMA Service. Check UIMA Log for Details");
/*
currentState = ProcessState.FailedInitialization;
agent.notify(ProcessState.FailedInitialization);
*/
setState(ProcessState.FailedInitialization);
} finally {
// Stop executor. It was only needed to poll AE initialization status.
// Since deploy() completed
// the UIMA AS service either succeeded initializing or it failed. In
// either case we no longer
// need to poll for initialization status
if ( executor != null ) {
executor.shutdownNow();
}
if ( tpe != null ) {
tpe.shutdown();
tpe.awaitTermination(0, TimeUnit.MILLISECONDS);
}
if ( workerThreadCount != null ) {
workerThreadCount.await();
// Determine if the process container requires thread affinity to AE instance.
// If it does, the worker thread has already called stop() which in
// turn called AE.destroy(). If the process container has no thread
// affinity, call stop() here to make sure the cleanup code shuts down
// internal components.
Method useThreadAffinityMethod = processorInstance.getClass().getDeclaredMethod("useThreadAffinity");
boolean useThreadAffinity =
(Boolean)useThreadAffinityMethod.invoke(processorInstance);
// if the container has thread affinity, the stop method must be
// called by the same thread that called initialize() and process().
// Such container's stop() is called in the Worker Thread.
if ( !useThreadAffinity) {
Method stopMethod = processorInstance.getClass().getSuperclass().getDeclaredMethod("stop");
stopMethod.invoke(processorInstance);
}
// Stop process container
//Method stopMethod = processorInstance.getClass().getDeclaredMethod("stop");
//stopMethod.invoke(processorInstance);
}
stop();
}
} catch( Exception e) {
/*
currentState = ProcessState.FailedInitialization;
agent.notify(currentState);
*/
setState(ProcessState.FailedInitialization);
logger.error("start", null,e);
stop();
}
}