in uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java [1320:1521]
public void updateProcessStatus(ProcessStateUpdateDuccEvent duccEvent) throws Exception {
String methodName = "updateProcessStatus";
try {
inventorySemaphore.acquire();
for (Entry<DuccId, IDuccProcess> processEntry : getInventoryRef().entrySet()) {
// Check if process with a given unique DuccId exist in the
// local map
if (processEntry.getKey().getUnique().equals(duccEvent.getDuccProcessId())) {
// found it. Update pid and state of the process
if (duccEvent.getPid() != null && processEntry.getValue().getPID() == null) {
processEntry.getValue().setPID(duccEvent.getPid());
}
if (duccEvent.getProcessJmxUrl() != null
&& processEntry.getValue().getProcessJmxUrl() == null) {
processEntry.getValue().setProcessJmxUrl(duccEvent.getProcessJmxUrl());
}
ITimeWindow tw = processEntry.getValue().getTimeWindowInit();
if (tw != null) {
if (!duccEvent.getState().equals(ProcessState.Initializing)) {
// Mark the time the process ended initialization. It also
// covers a case when the process terminates while initializing
tw.setEnd(TimeStamp.getCurrentMillis());
if (duccEvent.getState().equals(ProcessState.Running)) {
ITimeWindow twr = new TimeWindow();
String millis;
millis = TimeStamp.getCurrentMillis();
// Mark the time the process started running
processEntry.getValue().setTimeWindowRun(twr);
twr.setStart(millis);
}
}
} else {
logger.info(methodName, null,
"++++++++++++ Agent Init TimeWindow not available - tw==null");
}
ManagedProcess deployedProcess = null;
synchronized (monitor) {
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
// Find ManagedProcess instance the DuccProcess
// instance is associated with
ManagedProcess dProcess = it.next();
if (dProcess.getDuccProcess().getDuccId().getUnique()
.equals(duccEvent.getDuccProcessId())) {
deployedProcess = dProcess;
break;
}
}
}
if (processEntry.getValue().getProcessState() != ProcessState.Running
&& duccEvent.getState().equals(ProcessState.Running) && deployedProcess != null) {
// cancel process initialization timer.
deployedProcess.stopInitializationTimer();
}
logger.info(methodName, null,
">>>> Agent Handling Process Update - Ducc Id: "
+ processEntry.getValue().getDuccId() + " PID:"
+ processEntry.getValue().getPID() + " Status:" + duccEvent.getState()
+ " Deallocated:" + processEntry.getValue().isDeallocated());
if (deployedProcess != null && deployedProcess.getSocketEndpoint() == null
&& duccEvent.getServiceEdnpoint() != null) {
deployedProcess.setSocketEndpoint(duccEvent.getServiceEdnpoint());
}
// This is a delayed stop. Previously a request to stop the
// process was received
// but the PID was not available yet. Instead a flag was set
// to initiate a
// stop after the process reports the PID.
if (deployedProcess != null && deployedProcess.killAfterLaunch()) {
logger.info(methodName, null,
">>>> Process Ducc Id:" + processEntry.getValue().getDuccId()
+ " Was Previously Tagged for Kill While It Was Starting");
undeployProcess(processEntry.getValue());
} else if (deployedProcess != null && deployedProcess.doKill() && deployedProcess
.getDuccProcess().getProcessState().equals(ProcessState.Stopped)) {
deployedProcess.getDuccProcess()
.setReasonForStoppingProcess(ReasonForStoppingProcess.KilledByDucc.toString());
} else if (deployedProcess != null && (deployedProcess.doKill()
|| deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Failed)
|| deployedProcess.getDuccProcess().getProcessState()
.equals(ProcessState.Killed))) {
// The process has already stopped, but managed to send
// the last update before dying. Ignore the update
return;
} else if (changeState(processEntry.getValue().getProcessState())) {
logger.info(methodName, null, "=============== PID:" + processEntry.getValue().getPID()
+ " Changing State - current state:" + processEntry.getValue().getProcessState()
+ " New State:" + duccEvent.getState());
processEntry.getValue().setProcessState(duccEvent.getState());
DuccEventDispatcher dispatcher = configurationFactory
.getORDispatcher(super.getContext());
try {
DefaultNodeInventoryProcessor processor = configurationFactory
.nodeInventoryProcessor(this);
Map<DuccId, IDuccProcess> inventoryCopy = (Map<DuccId, IDuccProcess>) SerializationUtils
.clone((ConcurrentHashMap<DuccId, IDuccProcess>) inventory);
processor.dispatchInventoryUpdate(dispatcher,
configurationFactory.getInventoryUpdateEndpoint(), inventoryCopy);
logger.info(methodName, null, "Sent Node Inventory Update to the OR - process PID:"
+ processEntry.getValue().getPID());
} catch (Exception e) {
logger.warn("", null, e);
}
// if the process is Stopping, it must have hit an error threshold
}
// Check if MemoryCollector should be created for this
// process. It collects
// resident memory of the process at regular intervals.
// Should only be added
// once for each process. This route will have its id set to
// process PID.
if (addProcessMemoryCollector(duccEvent.getPid())
&& (duccEvent.getState().equals(ProcessState.Initializing)
|| duccEvent.getState().equals(ProcessState.Running))) {
if (duccEvent.getState().equals(ProcessState.Running)) {
if (processEntry.getValue().getUimaPipelineComponents() != null
&& processEntry.getValue().getUimaPipelineComponents().size() > 0) {
processEntry.getValue().getUimaPipelineComponents().clear();
if (duccEvent.getUimaPipeline() != null) {
duccEvent.getUimaPipeline().clear();
}
}
}
} else if (duccEvent.getState().equals(ProcessState.Stopped)
|| duccEvent.getState().equals(ProcessState.Failed)
|| duccEvent.getState().equals(ProcessState.Killed)) {
if (deployedProcess.getMetricsProcessor() != null) {
deployedProcess.getMetricsProcessor().close(); // close open fds (stat and statm
// files)
}
logger.info(methodName, null,
"----------- Agent Stopped ProcessMemoryUsagePollingRouter for Process:"
+ duccEvent.getPid());
} else if (duccEvent.getState().equals(ProcessState.FailedInitialization)) {
logger.info(methodName, null,
">>>> Agent Handling Process FailedInitialization. PID:" + duccEvent.getPid());
deployedProcess.getDuccProcess().setReasonForStoppingProcess(
ReasonForStoppingProcess.FailedInitialization.toString());
deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping);
deployedProcess.setStopping();
deployedProcess.kill();
logger.info(methodName, null, ">>>> Agent Handling Process FailedInitialization. PID:"
+ duccEvent.getPid() + " Killing Process");
undeployProcess(processEntry.getValue());
} else if (duccEvent.getState().equals(ProcessState.InitializationTimeout)) {
deployedProcess.getDuccProcess().setReasonForStoppingProcess(
ReasonForStoppingProcess.InitializationTimeout.toString());
deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping);
deployedProcess.setStopping();
// Mark process for death. Doesnt actually kill the process
deployedProcess.kill();
logger.info(methodName, null, ">>>> Agent Handling Process InitializationTimeout. PID:"
+ duccEvent.getPid() + " Killing Process");
undeployProcess(processEntry.getValue());
} else if (duccEvent.getState().equals(ProcessState.Stopping)) {
if (duccEvent.getMessage() != null && duccEvent.getMessage()
.equals(ReasonForStoppingProcess.ExceededErrorThreshold.toString())) {
processEntry.getValue().setReasonForStoppingProcess(
ReasonForStoppingProcess.ExceededErrorThreshold.toString());
}
if (!deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Stopped)
&& !deployedProcess.getDuccProcess().getProcessState()
.equals(ProcessState.Stopping)) {
deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping);
deployedProcess.setStopping();
}
}
if (duccEvent.getUimaPipeline() != null) {
StringBuffer buffer = new StringBuffer("\t\tUima Pipeline -");
for (IUimaPipelineAEComponent uimaAeState : duccEvent.getUimaPipeline()) {
buffer.append("\n\t\tAE:").append(uimaAeState.getAeName()).append(" state:")
.append(uimaAeState.getAeState()).append(" InitTime:")
.append(uimaAeState.getInitializationTime() / 1000).append(" secs. Thread:")
.append(uimaAeState.getAeThreadId());
}
logger.info(methodName, null, buffer.toString());
((DuccProcess) processEntry.getValue())
.setUimaPipelineComponents(duccEvent.getUimaPipeline());
}
return; // found it. Done
}
}
} catch (InterruptedException e) {
} finally {
inventorySemaphore.release();
}
}