in uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java [202:329]
public IServiceStatistics getStatistics()
{
String methodName = "getStatistics";
doLog(methodName, "***********************************************");
IServiceStatistics statistics = new ServiceStatistics(false, false, "<NA>");
String failure_reason = null;
nodeIp = "N/A";
pid = "N/A";
evaluateService(statistics); // if we get here, the get-meta worked well enough
ExecutorService executor = null;
Exception excp = null;
boolean gmfail = false;
Future<Boolean> future = null;
try {
initJMS();
TextMessage msg = producerSession.createTextMessage();
msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName());
msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.GetMeta);
msg.setJMSReplyTo(consumerDestination);
msg.setText("");
doLog(methodName, "Sending getMeta request to " + endpoint + " at " + brokerURI);
producer.send(msg);
long startTime = System.currentTimeMillis();
executor = Executors.newSingleThreadExecutor();
future = executor.submit(new Callable<Boolean>() {
public Boolean call() throws Exception {
// First receive() is to get IP and PID of the process that will process getMeta
ActiveMQTextMessage serviceInfoReply = (ActiveMQTextMessage) consumer.receive();
nodeIp = serviceInfoReply.getStringProperty(AsynchAEMessage.ServerIP);
pid = serviceInfoReply.getStringProperty(AsynchAEMessage.UimaASProcessPID);
// second receive() is for GetMeta reply
// dont need to process the actual reply. If receive() succeeds we have
// a good GetMeta call
consumer.receive();
return true;
}
}
);
// wait for getMeta reply and timeout if not received within allotted window
future.get(meta_timeout, TimeUnit.MILLISECONDS);
future.cancel(true);
long replyTime = System.currentTimeMillis() - startTime;
statistics.setAlive(true);
statistics.setHealthy(true && statistics.isHealthy());
statistics.setInfo("Get-meta took " + replyTime + " msecs.");
doLog(methodName, "Reply received in ", replyTime, " ms");
gmfail = false;
} catch ( ExecutionException e) {
excp = e;
gmfail = true;
statistics.setHealthy(false);
statistics.setAlive(false);
statistics.setInfo("Ping error: " + e);
doLog(methodName, null, "Error while awaiting getmeta reply from ", nodeIp, "PID", pid);
if ( future != null ) {
future.cancel(true);
}
} catch ( InterruptedException e) {
excp = e;
gmfail = true;
statistics.setHealthy(false);
statistics.setAlive(false);
statistics.setInfo("Ping error: " + e);
doLog(methodName, null, "Thread interrupted while waiting for getmeta reply from ", nodeIp, "PID", pid);
if ( future != null ) {
future.cancel(true);
}
} catch( TimeoutException e) {
excp = e;
gmfail = true;
statistics.setHealthy(false);
statistics.setAlive(false);
statistics.setInfo("Ping error: " + e);
doLog(methodName, null, "Get-Meta timeout ("+meta_timeout+" ms) from ", nodeIp, "PID", pid);
if ( future != null ) {
future.cancel(true);
}
} catch (JMSException e) {
excp = e;
gmfail = true;
statistics.setHealthy(false);
statistics.setAlive(false);
statistics.setInfo("Ping error: " + e);
//e.printStackTrace();
} finally {
stop();
if ( executor != null ) {
executor.shutdownNow();
}
}
if ( gmfail || excp != null ) {
failure_reason = "Cannot issue getMeta to: " + endpoint + ":" + brokerURI;
if ( excp != null ) {
if (excp.getCause() == null ) {
failure_reason = failure_reason + ": " + excp.toString();
} else {
failure_reason = failure_reason + ": " + excp.getCause();
}
}
doLog(methodName, failure_reason);
}
if ( monitor == null ) { // no jmx active
if ( failure_reason != null ) {
statistics.setInfo(failure_reason);
} else {
if (failover) {
statistics.setInfo("(JMX statistics not available for failover protocol)");
} else {
statistics.setInfo("Ping to " + nodeIp + ": " + pid + " ok. (JMX disabled.)");
}
}
} else {
monitor.setSource(nodeIp, pid, gmfail, failure_reason);
statistics.setInfo(monitor.format());
}
return statistics;
}