in uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccMonitor.java [284:525]
private int runInternal(String[] args) throws Exception {
// DUCC_HOME
String ducc_home = Utils.findDuccHome();
if (ducc_home == null) {
messageProcessor
.status("Missing required environment variable: DUCC_HOME");
return RC_FAILURE;
}
// Ingest ducc.properties
duccPropertiesResolver = DuccPropertiesResolver.getInstance();
// Parse
synchronized (DuccMonitor.class) {
command_line = new CommandLine(args, opts);
try {
command_line.parse();
} catch ( IllegalArgumentException e ) {
System.out.println("Illegal arguments: " + e.getMessage());
help(opts);
return RC_HELP;
}
if (command_line.contains(UiOption.Help)) {
help(opts);
return RC_HELP;
}
if (command_line.contains(UiOption.Timestamp)) {
flag_timestamp.set(true);
}
if (command_line.contains(UiOption.Quiet)) {
flag_info.set(false);
flag_error.set(false);
}
if (command_line.contains(UiOption.Debug)) {
flag_debug.set(true);
}
if (command_line.contains(UiOption.CancelOnInterrupt)) {
flag_cancel_on_interrupt.set(true);
}
if (command_line.contains(UiOption.JobId)) {
id = command_line.get(UiOption.JobId);
}
else if (command_line.contains(UiOption.ManagedReservationId)) {
id = command_line.get(UiOption.ManagedReservationId);
}
else if (command_line.contains(UiOption.ReservationId)) {
id = command_line.get(UiOption.ReservationId);
}
else {
System.out.println(command_line.formatHelp(DuccJobMonitor.class.getName()));
return RC_HELP;
}
}
// Handle Ctl-C
main = Thread.currentThread();
Thread killer = new Killer(main);
Runtime.getRuntime().addShutdownHook(killer);
// Setup polling
adjustWakeupInterval();
String urlString = getUrl(id);
String lastMessage = "";
String thisMessage = "";
String lastRationale = "";
String thisRationale = "";
StringBuffer message = new StringBuffer();
message.append("id:" + id);
message.append(" location:");
message.append(ManagementFactory.getRuntimeMXBean().getName());
info(message.toString());
debug(urlString);
// Poll until finished - retry if the WS appears to be down
boolean connectionFailed = false;
while (flag_observer.get()) {
DuccEventHttpDispatcherCl dispatcher = null;
MonitorInfo monitorInfo = null;
try {
dispatcher = new DuccEventHttpDispatcherCl(urlString, urlTimeout);
monitorInfo = (MonitorInfo) dispatcher.dispatchJson(MonitorInfo.class);
if (connectionFailed) {
info("id:" + id + " warning:Connection to DUCC restored");
connectionFailed = false;
}
} catch (ConnectException e) {
if (!connectionFailed) {
info("id:" + id + " warning:Connection to DUCC failed -- retrying");
connectionFailed = true;
}
}
if ( monitorInfo != null ) {
// It is possible after OR "warm" start that
// work item processing status information
// may be missing or incorrect for a short time.
// Therefore, we assure that newly arrived
// information is not a regression from the
// last good one received, if any.
if(monitorInfo.isRegression(previousMonitorInfo)) {
continue;
}
previousMonitorInfo = monitorInfo;
displayRemotePids(monitorInfo);
int stateCount = monitorInfo.stateSequence.size();
debug("states:" + stateCount);
// If OR or network is very slow WS may not have seen the job yet so just report NotFound
// No longer give up and possibly falsely cancel the job
String state = NotFound;
Iterator<String> states = monitorInfo.stateSequence.iterator();
while (states.hasNext()) {
state = states.next();
debug("list:" + state);
}
message = new StringBuffer();
message.append("id:" + id);
message.append(" state:" + state);
if (state.equals(StateRunning)) {
message.append(details(monitorInfo));
} else if (state.equals(StateCompleting)) {
flag_cancel_on_interrupt.set(false);
message.append(details(monitorInfo));
} else if (state.equals(StateCompleted)) {
flag_cancel_on_interrupt.set(false);
message.append(details(monitorInfo));
}
else if (context == DuccContext.Reservation && state.equals(StateAssigned)) { // A reservation has completed
flag_cancel_on_interrupt.set(false);
message.append(details(monitorInfo));
}
thisMessage = message.toString();
if (!thisMessage.equals(lastMessage)) {
boolean suppress = false;
if(state.equals(StateRunning)) {
if(seenRemotePids.size() == 0) {
suppress = true;
if(delayedRunning == null) {
delayedRunning = message.toString();
}
}
else {
delayedRunning = null;
}
}
if(!suppress) {
if(delayedRunning != null) {
info(delayedRunning);
delayedRunning = null;
}
info(thisMessage);
lastMessage = thisMessage;
}
}
if (state.equals(StateWaitingForResources)) {
if (!monitorInfo.rationale.equals("")) {
thisRationale = monitorInfo.rationale;
if (!thisRationale.equals(lastRationale)) {
info(thisRationale);
lastRationale = thisRationale;
}
}
}
if (context == DuccContext.Reservation && state.equals(StateAssigned)) {
if(monitorInfo.nodes != null) {
if(monitorInfo.nodes.size() > 0) {
StringBuffer sb = new StringBuffer();
sb.append("nodes: ");
for(String node : monitorInfo.nodes) {
sb.append(node);
sb.append(" ");
}
String nodes = sb.toString().trim();
info(nodes);
}
}
return RC_SUCCESS;
}
if (state.equals(StateCompleted)) {
// See Jira 2911
//if (monitorInfo.procs.equals("0")) {
if (monitorInfo.total.equals(monitorInfo.done)) {
if (!monitorInfo.rationale.equals("")) {
message = new StringBuffer();
message.append("id:" + id);
message.append(" rationale:" + monitorInfo.rationale);
thisMessage = message.toString();
info(thisMessage);
}
int rc = RC_FAILURE;
message = new StringBuffer();
message.append("id:" + id);
try {
rc = Integer.parseInt(monitorInfo.code);
message.append(" rc:" + rc);
} catch (NumberFormatException e) {
message.append(" code:" + monitorInfo.code);
}
thisMessage = message.toString();
info(thisMessage);
return rc;
} else {
if (!monitorInfo.errorLogs.isEmpty()) {
message = new StringBuffer();
message.append("id:" + id);
List<String> errorLogs = monitorInfo.errorLogs;
for (String errorLog : errorLogs) {
message.append(" file:" + errorLog);
}
thisMessage = message.toString();
info(thisMessage);
}
if (!monitorInfo.rationale.equals("")) {
message = new StringBuffer();
message.append("id:" + id);
message.append(" rationale:" + monitorInfo.rationale);
thisMessage = message.toString();
info(thisMessage);
}
message = new StringBuffer();
message.append("id:" + id);
message.append(" rc:" + RC_FAILURE);
thisMessage = message.toString();
info(thisMessage);
return RC_FAILURE;
}
//}
}
}
long start = System.currentTimeMillis();
long end = start;
while (!isTimeExpired(start, end, wakeupInterval)) {
if (!flag_observer.get()) {
break;
}
try {
Thread.sleep(wakeupInterval);
} catch (InterruptedException e) {
debug(e);
}
end = System.currentTimeMillis();
}
}
return RC_SUCCESS;
}