in uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeUsersCollector.java [250:495]
public TreeMap<String, NodeUsersInfo> call() throws Exception {
String location = "call";
TreeMap<String, NodeUsersInfo> map = new TreeMap<String, NodeUsersInfo>();
List<String> currentPids = new ArrayList<String>();
InputStream stream = null;
BufferedReader reader = null;
try {
ProcessBuilder pb;
if (Utils.isMac()) {
pb = new ProcessBuilder("ps", "-Ao", "user=,pid=,ppid=,uid=,args=");
} else {
pb = new ProcessBuilder("ps", "-Ao", "user:32,pid,ppid,uid,args", "--no-heading");
}
pb.redirectErrorStream(true);
Process proc = pb.start();
// spawn ps command and scrape the output
stream = proc.getInputStream();
reader = new BufferedReader(new InputStreamReader(stream));
String line;
String regex = "\\s+";
if (agent != null) {
// copy all known reservations reported by the OR
agent.copyAllUserReservations(map);
}
if (logger == null) {
// System.out.println(
// "********** User Process Map Size After copyAllUserReservations:"+map.size());
} else {
logger.debug(location, null,
"********** User Process Map Size After copyAllUserReservations:" + map.size());
}
if (agent != null) {
// copy all known rogue processes detected previously
agent.getRogueProcessReaper().copyAllUserRogueProcesses(map);
}
if (logger == null) {
// System.out.println(
// "********** User Process Map Size After copyAllUserRougeProcesses:"+map.size());
} else {
logger.debug(location, null,
"********** User Process Map Size After copyAllUserRougeProcesses:" + map.size());
}
// Add all running processes to this list. Will use this list to determine if a process has a
// parent
// which is a rogue process.
Set<NodeUsersCollector.ProcessInfo> processList = new HashSet<NodeUsersCollector.ProcessInfo>();
Set<RunningProcess> tempProcessList = new HashSet<RunningProcess>();
// To detect rogues there are two scans through process list:
// #1 - fills tempProcessList which will be used to check each
// process parent if its own by ducc.
// #2 - the actual rogue process detection loop
List<String> procList = new ArrayList<String>();
// read the next line from ps output
while ((line = reader.readLine()) != null) {
// save line for subsequent processing in the for..loop below
procList.add(line);
String tokens[] = line.split(regex);
if (tokens.length > 0) {
RunningProcess p = new RunningProcess(tokens[1], tokens[2], tokens[0]);
// add process to a list which is used to look up each process parent
tempProcessList.add(p);
}
}
// the above loop filled tempProcessList, so now detect rogue processes.
for (String procInfo : procList) {
String tokens[] = procInfo.split(regex);
String user = tokens[0];
String pid = tokens[1];
String ppid = tokens[2];
String uid = tokens[3];
String cmd = tokens[4];
if (tokens.length > 0) {
try {
// by convention processes owned by uid < gidMax are system processes thus not rogue
if (Integer.valueOf(uid) < uidMax) {
continue;
}
} catch (NumberFormatException nfe) {
}
// walk up the tree of ancestor processes to check if any is owned by ducc. If so, this
// process is not rogue.
if (processAncestorIsOwnedByDucc(pid, tempProcessList)) {
continue; // skip as this is not a rogue process
}
boolean ghost = false;
boolean jobOrServiceProcess = isJobOrServiceProcess(tokens);
if (jobOrServiceProcess) {
ghost = ghostJobOrServiceProcess(pid);
if (!ghost) {
continue;
}
}
// any process owned by user who started the agent process is not rogue
if (ducc_user.equalsIgnoreCase(user) && !ghost) {
continue;
}
// Detect and skip all ducc daemons except uima-as service
// if ( duccDaemon(tokens)) {
// continue;
// }
if (logger == null) {
// System.out.print(line);
} else {
logger.trace(location, null, line);
}
// Check if current process is owned by a user that should be excluded
// from rogue process detection. A list of excluded users is in ducc.properties
// Dont include root, nfs, and other system owned processes. Also exclude
// processes that are defined in the process exclusion list in ducc.properties
if (excludeUser(user) || excludeProcess(cmd) || Utils.getPID().equals(pid)) {
continue; // skip this process
}
if (agent != null) {
// check if this process is in any of the cgroups. If so, this process is not rogue
// if ( ((NodeAgent)agent).useCgroups &&
// ((NodeAgent)agent).cgroupsManager.isPidInCGroup(pid) ) {
// continue; // not rogue, this process is in a cgroup
// }
NodeUsersInfo nui = null;
// Check if user record is already in the map. May have been done above in
// copyAllUserReservations().
if (map.containsKey(user)) {
nui = map.get(user);
} else {
nui = new NodeUsersInfo(user);
map.put(user, nui);
}
if (logger == null) {
// System.out.println(
// "User:"+user+" Reservations:"+nui.getReservations().size()+" Rogue
// Processes:"+nui.getRogueProcesses().size());
} else {
logger.info(location, null,
"User:" + user + " Reservations:" + nui.getReservations().size()
+ " Rogue Processes:" + nui.getRogueProcesses().size());
}
// add a process to a list of processes currently running on the node. The list will be
// used
// to remove stale rogue processes at the end of this method
// currentPids.add(tokens[1]);
currentPids.add(pid);
if (logger == null) {
} else {
logger.trace(location, null, "Current Promuscess (Before Calling aggregate() - PID:"
+ pid + " PPID:" + ppid + " Process List Size:" + processList.size());
}
NodeUsersCollector.ProcessInfo pi = new NodeUsersCollector.ProcessInfo(
Integer.parseInt(pid), Integer.parseInt(ppid));
// add the process to the list of processes. If this process has a parent, it will be
// added as a child. Compose
// hierarchy of processes so that we can use it later to determine if any given process
// has a parent that is rogue
aggregate(processList, pi);
// fetch user reservations
List<IDuccId> userReservations = nui.getReservations();
// if user has reservations on the node, any process found is not a rogue process
if (userReservations.size() > 0) {
boolean found = false;
// check if this process has previously been marked as rogue
for (NodeProcess rogue : nui.getRogueProcesses()) {
if (rogue.getPid().equals(pid)) {
found = true;
break;
}
}
if (!found && !agent.isManagedProcess(processList, pi)) {
// code keeps count of java and non-java processes separately, so pass the type of
// process (java or not)
// to allow distinct accounting
nui.addPid(pid, ppid, cmd.endsWith("java"));
}
continue; // all we know that the user has a reservation and there is a process
// running. If there
// are reservations, we cant determine which user process is a rogue process
}
// detect if this is a rogue process and add it to the rogue process list. First check
// if the current process
// has a parent and if so, check if the parent is rogue. Second, if parent is not rogue
// (or no parent)
// check if the process is in agent's inventory. If its not, we have a rogue process.
if (agent.isRogueProcess(user, processList, pi)) {
if (nui.getRogueProcesses().size() == 0
|| !inRogueList(nui.getRogueProcesses(), pid)) {
pi.setRogue(true);
// agent.getRogueProcessReaper().submitRogueProcessForKill(user, pid, ppid,
// cmd.endsWith("java"));
}
agent.getRogueProcessReaper().submitRogueProcessForKill(user, pid, ppid,
cmd.endsWith("java"));
}
}
}
}
} catch (Exception e) {
if (logger == null) {
e.printStackTrace();
} else {
logger.error(location, null, e);
}
} finally {
if (reader != null) {
try {
reader.close();
} catch (Exception exx) {
}
}
}
StringBuffer sb = new StringBuffer();
// if no processes found, clear rogue process list and list of processes associated with a
// reserve
if (currentPids.isEmpty()) {
for (Map.Entry<String, NodeUsersInfo> entry : map.entrySet()) {
entry.getValue().getReserveProcesses().clear();
entry.getValue().getRogueProcesses().clear();
}
}
for (Map.Entry<String, NodeUsersInfo> entry : map.entrySet()) {
sb.append(entry.getValue().toString()).append("\n");
}
if (logger == null) {
System.out.println(sb.toString());
System.out.println(
"***************************************************************************************");
} else {
if (sb.length() > 0) {
logger.info(location, null, sb.toString());
logger.info(location, null,
"******************************************************************************");
}
}
// remove any rogue processes that are not in the list of current processes just collected
agent.getRogueProcessReaper().removeDeadRogueProcesses(currentPids);
return map;
}