private int runInternal()

in uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccMonitor.java [284:525]


	private int runInternal(String[] args) throws Exception {
		// DUCC_HOME
		String ducc_home = Utils.findDuccHome();
		if (ducc_home == null) {
			messageProcessor
					.status("Missing required environment variable: DUCC_HOME");
			return RC_FAILURE;
		}
		// Ingest ducc.properties
		duccPropertiesResolver = DuccPropertiesResolver.getInstance();
		// Parse
		synchronized (DuccMonitor.class) {
			command_line = new CommandLine(args, opts);
            try {
                command_line.parse();
            } catch ( IllegalArgumentException e ) {
                System.out.println("Illegal arguments: " + e.getMessage());
                help(opts);
                return RC_HELP;
            }

			if (command_line.contains(UiOption.Help)) {
				help(opts);
				return RC_HELP;
			}
            
			if (command_line.contains(UiOption.Timestamp)) {
				flag_timestamp.set(true);
			}
			if (command_line.contains(UiOption.Quiet)) {
				flag_info.set(false);
				flag_error.set(false);
			}
			if (command_line.contains(UiOption.Debug)) {
				flag_debug.set(true);
			}
			if (command_line.contains(UiOption.CancelOnInterrupt)) {
				flag_cancel_on_interrupt.set(true);
			}
			if (command_line.contains(UiOption.JobId)) {
				id = command_line.get(UiOption.JobId);
			}
			else if (command_line.contains(UiOption.ManagedReservationId)) {
				id = command_line.get(UiOption.ManagedReservationId);
            } 
			else if (command_line.contains(UiOption.ReservationId)) {
				id = command_line.get(UiOption.ReservationId);
            } 
			else {
                System.out.println(command_line.formatHelp(DuccJobMonitor.class.getName()));
				return RC_HELP;
			}
		}
		// Handle Ctl-C
		main = Thread.currentThread();
		Thread killer = new Killer(main);
		Runtime.getRuntime().addShutdownHook(killer);
		// Setup polling
		adjustWakeupInterval();
		String urlString = getUrl(id);
		String lastMessage = "";
		String thisMessage = "";
		String lastRationale = "";
		String thisRationale = "";
		StringBuffer message = new StringBuffer();
		message.append("id:" + id);
		message.append(" location:");
		message.append(ManagementFactory.getRuntimeMXBean().getName());
		info(message.toString());
		debug(urlString);
		// Poll until finished - retry if the WS appears to be down
		boolean connectionFailed = false;
		while (flag_observer.get()) {
			DuccEventHttpDispatcherCl dispatcher = null;
			MonitorInfo monitorInfo = null;
			try {
				dispatcher = new DuccEventHttpDispatcherCl(urlString, urlTimeout);
				monitorInfo = (MonitorInfo) dispatcher.dispatchJson(MonitorInfo.class);
				if (connectionFailed) {
					info("id:" + id + " warning:Connection to DUCC restored");
					connectionFailed = false;
				}
			} catch (ConnectException e) {
				if (!connectionFailed) {
					info("id:" + id + " warning:Connection to DUCC failed -- retrying");
					connectionFailed = true;
				}
			}

            if ( monitorInfo != null ) {
            	// It is  possible after OR "warm" start that
            	// work item processing status information 
            	// may be missing or incorrect for a short time.
            	// Therefore, we assure that newly arrived
            	// information is not a regression from the
            	// last good one received, if any.
            	if(monitorInfo.isRegression(previousMonitorInfo)) {
            		continue;
            	}
            	previousMonitorInfo = monitorInfo;
            	displayRemotePids(monitorInfo);
				int stateCount = monitorInfo.stateSequence.size();
				debug("states:" + stateCount);
				// If OR or network is very slow WS may not have seen the job yet so just report NotFound
				// No longer give up and possibly falsely cancel the job
				String state = NotFound;
				Iterator<String> states = monitorInfo.stateSequence.iterator();
				while (states.hasNext()) {
					state = states.next();
					debug("list:" + state);
				}
				message = new StringBuffer();
				message.append("id:" + id);
				message.append(" state:" + state);
				if (state.equals(StateRunning)) {
					message.append(details(monitorInfo));
				} else if (state.equals(StateCompleting)) {
					flag_cancel_on_interrupt.set(false);
					message.append(details(monitorInfo));
				} else if (state.equals(StateCompleted)) {
					flag_cancel_on_interrupt.set(false);
					message.append(details(monitorInfo));
				}
				else if (context == DuccContext.Reservation && state.equals(StateAssigned)) {       // A reservation has completed
					flag_cancel_on_interrupt.set(false);
					message.append(details(monitorInfo));
				}
				thisMessage = message.toString();
				if (!thisMessage.equals(lastMessage)) {
					boolean suppress = false;
					if(state.equals(StateRunning)) {
						if(seenRemotePids.size() == 0) {
							suppress = true;
							if(delayedRunning == null) {
								delayedRunning = message.toString();
							}
						}
						else {
							delayedRunning = null;
						}
					}
					if(!suppress) {
						if(delayedRunning != null) {
							info(delayedRunning);
							delayedRunning = null;
						}
						info(thisMessage);
						lastMessage = thisMessage;
					}
				}
				if (state.equals(StateWaitingForResources)) {
					if (!monitorInfo.rationale.equals("")) {
						thisRationale = monitorInfo.rationale;
						if (!thisRationale.equals(lastRationale)) {
							info(thisRationale);
							lastRationale = thisRationale;
						}
					}
				}
				if (context == DuccContext.Reservation && state.equals(StateAssigned)) {
					if(monitorInfo.nodes != null) {
						if(monitorInfo.nodes.size() > 0) {
							StringBuffer sb = new StringBuffer();
							sb.append("nodes: ");
							for(String node : monitorInfo.nodes) {
								sb.append(node);
								sb.append(" ");
							}
							String nodes = sb.toString().trim();
							info(nodes);
						}
					}
					return RC_SUCCESS;
				}
				if (state.equals(StateCompleted)) {
					// See Jira 2911
					//if (monitorInfo.procs.equals("0")) {
						if (monitorInfo.total.equals(monitorInfo.done)) {
							if (!monitorInfo.rationale.equals("")) {
								message = new StringBuffer();
								message.append("id:" + id);
								message.append(" rationale:" + monitorInfo.rationale);
								thisMessage = message.toString();
								info(thisMessage);
							}
							int rc = RC_FAILURE;
							message = new StringBuffer();
							message.append("id:" + id);
							try {
								rc = Integer.parseInt(monitorInfo.code);
								message.append(" rc:" + rc);
							} catch (NumberFormatException e) {
								message.append(" code:" + monitorInfo.code);
							}
							thisMessage = message.toString();
							info(thisMessage);
							return rc;
						} else {
							if (!monitorInfo.errorLogs.isEmpty()) {
								message = new StringBuffer();
								message.append("id:" + id);
								List<String> errorLogs = monitorInfo.errorLogs;
								for (String errorLog : errorLogs) {
									message.append(" file:" + errorLog);
								}
								thisMessage = message.toString();
								info(thisMessage);
							}
							if (!monitorInfo.rationale.equals("")) {
								message = new StringBuffer();
								message.append("id:" + id);
								message.append(" rationale:" + monitorInfo.rationale);
								thisMessage = message.toString();
								info(thisMessage);
							}
							message = new StringBuffer();
							message.append("id:" + id);
							message.append(" rc:" + RC_FAILURE);
							thisMessage = message.toString();
							info(thisMessage);
							return RC_FAILURE;
						}
					//}
				}
			}
			long start = System.currentTimeMillis();
			long end = start;
			while (!isTimeExpired(start, end, wakeupInterval)) {
				if (!flag_observer.get()) {
					break;
				}
				try {
					Thread.sleep(wakeupInterval);
				} catch (InterruptedException e) {
					debug(e);
				}
				end = System.currentTimeMillis();
			}

		}
		return RC_SUCCESS;
	}