public TreeMap call()

in uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/NodeUsersCollector.java [250:495]


  public TreeMap<String, NodeUsersInfo> call() throws Exception {
    String location = "call";
    TreeMap<String, NodeUsersInfo> map = new TreeMap<String, NodeUsersInfo>();

    List<String> currentPids = new ArrayList<String>();
    InputStream stream = null;
    BufferedReader reader = null;
    try {

      ProcessBuilder pb;
      if (Utils.isMac()) {
        pb = new ProcessBuilder("ps", "-Ao", "user=,pid=,ppid=,uid=,args=");
      } else {
        pb = new ProcessBuilder("ps", "-Ao", "user:32,pid,ppid,uid,args", "--no-heading");
      }
      pb.redirectErrorStream(true);
      Process proc = pb.start();
      // spawn ps command and scrape the output
      stream = proc.getInputStream();
      reader = new BufferedReader(new InputStreamReader(stream));
      String line;
      String regex = "\\s+";
      if (agent != null) {
        // copy all known reservations reported by the OR
        agent.copyAllUserReservations(map);
      }
      if (logger == null) {
        // System.out.println(
        // "********** User Process Map Size After copyAllUserReservations:"+map.size());
      } else {
        logger.debug(location, null,
                "********** User Process Map Size After copyAllUserReservations:" + map.size());
      }
      if (agent != null) {
        // copy all known rogue processes detected previously
        agent.getRogueProcessReaper().copyAllUserRogueProcesses(map);
      }
      if (logger == null) {
        // System.out.println(
        // "********** User Process Map Size After copyAllUserRougeProcesses:"+map.size());
      } else {
        logger.debug(location, null,
                "********** User Process Map Size After copyAllUserRougeProcesses:" + map.size());
      }
      // Add all running processes to this list. Will use this list to determine if a process has a
      // parent
      // which is a rogue process.
      Set<NodeUsersCollector.ProcessInfo> processList = new HashSet<NodeUsersCollector.ProcessInfo>();

      Set<RunningProcess> tempProcessList = new HashSet<RunningProcess>();

      // To detect rogues there are two scans through process list:
      // #1 - fills tempProcessList which will be used to check each
      // process parent if its own by ducc.
      // #2 - the actual rogue process detection loop

      List<String> procList = new ArrayList<String>();
      // read the next line from ps output
      while ((line = reader.readLine()) != null) {
        // save line for subsequent processing in the for..loop below
        procList.add(line);
        String tokens[] = line.split(regex);
        if (tokens.length > 0) {
          RunningProcess p = new RunningProcess(tokens[1], tokens[2], tokens[0]);
          // add process to a list which is used to look up each process parent
          tempProcessList.add(p);
        }
      }
      // the above loop filled tempProcessList, so now detect rogue processes.
      for (String procInfo : procList) {
        String tokens[] = procInfo.split(regex);
        String user = tokens[0];
        String pid = tokens[1];
        String ppid = tokens[2];
        String uid = tokens[3];
        String cmd = tokens[4];

        if (tokens.length > 0) {
          try {
            // by convention processes owned by uid < gidMax are system processes thus not rogue
            if (Integer.valueOf(uid) < uidMax) {
              continue;
            }
          } catch (NumberFormatException nfe) {

          }
          // walk up the tree of ancestor processes to check if any is owned by ducc. If so, this
          // process is not rogue.
          if (processAncestorIsOwnedByDucc(pid, tempProcessList)) {
            continue; // skip as this is not a rogue process
          }
          boolean ghost = false;
          boolean jobOrServiceProcess = isJobOrServiceProcess(tokens);
          if (jobOrServiceProcess) {
            ghost = ghostJobOrServiceProcess(pid);
            if (!ghost) {
              continue;
            }
          }

          // any process owned by user who started the agent process is not rogue
          if (ducc_user.equalsIgnoreCase(user) && !ghost) {
            continue;
          }
          // Detect and skip all ducc daemons except uima-as service
          // if ( duccDaemon(tokens)) {
          // continue;
          // }
          if (logger == null) {
            // System.out.print(line);
          } else {
            logger.trace(location, null, line);
          }
          // Check if current process is owned by a user that should be excluded
          // from rogue process detection. A list of excluded users is in ducc.properties
          // Dont include root, nfs, and other system owned processes. Also exclude
          // processes that are defined in the process exclusion list in ducc.properties
          if (excludeUser(user) || excludeProcess(cmd) || Utils.getPID().equals(pid)) {
            continue; // skip this process
          }
          if (agent != null) {
            // check if this process is in any of the cgroups. If so, this process is not rogue
            // if ( ((NodeAgent)agent).useCgroups &&
            // ((NodeAgent)agent).cgroupsManager.isPidInCGroup(pid) ) {
            // continue; // not rogue, this process is in a cgroup
            // }
            NodeUsersInfo nui = null;
            // Check if user record is already in the map. May have been done above in
            // copyAllUserReservations().
            if (map.containsKey(user)) {
              nui = map.get(user);
            } else {
              nui = new NodeUsersInfo(user);
              map.put(user, nui);
            }
            if (logger == null) {
              // System.out.println(
              // "User:"+user+" Reservations:"+nui.getReservations().size()+" Rogue
              // Processes:"+nui.getRogueProcesses().size());
            } else {
              logger.info(location, null,
                      "User:" + user + " Reservations:" + nui.getReservations().size()
                              + " Rogue Processes:" + nui.getRogueProcesses().size());
            }
            // add a process to a list of processes currently running on the node. The list will be
            // used
            // to remove stale rogue processes at the end of this method
            // currentPids.add(tokens[1]);
            currentPids.add(pid);
            if (logger == null) {
            } else {
              logger.trace(location, null, "Current Promuscess (Before Calling aggregate() - PID:"
                      + pid + " PPID:" + ppid + " Process List Size:" + processList.size());
            }
            NodeUsersCollector.ProcessInfo pi = new NodeUsersCollector.ProcessInfo(
                    Integer.parseInt(pid), Integer.parseInt(ppid));
            // add the process to the list of processes. If this process has a parent, it will be
            // added as a child. Compose
            // hierarchy of processes so that we can use it later to determine if any given process
            // has a parent that is rogue
            aggregate(processList, pi);

            // fetch user reservations
            List<IDuccId> userReservations = nui.getReservations();
            // if user has reservations on the node, any process found is not a rogue process
            if (userReservations.size() > 0) {
              boolean found = false;
              // check if this process has previously been marked as rogue
              for (NodeProcess rogue : nui.getRogueProcesses()) {
                if (rogue.getPid().equals(pid)) {
                  found = true;
                  break;
                }
              }

              if (!found && !agent.isManagedProcess(processList, pi)) {
                // code keeps count of java and non-java processes separately, so pass the type of
                // process (java or not)
                // to allow distinct accounting
                nui.addPid(pid, ppid, cmd.endsWith("java"));
              }
              continue; // all we know that the user has a reservation and there is a process
                        // running. If there
                        // are reservations, we cant determine which user process is a rogue process
            }

            // detect if this is a rogue process and add it to the rogue process list. First check
            // if the current process
            // has a parent and if so, check if the parent is rogue. Second, if parent is not rogue
            // (or no parent)
            // check if the process is in agent's inventory. If its not, we have a rogue process.
            if (agent.isRogueProcess(user, processList, pi)) {
              if (nui.getRogueProcesses().size() == 0
                      || !inRogueList(nui.getRogueProcesses(), pid)) {
                pi.setRogue(true);
                // agent.getRogueProcessReaper().submitRogueProcessForKill(user, pid, ppid,
                // cmd.endsWith("java"));
              }
              agent.getRogueProcessReaper().submitRogueProcessForKill(user, pid, ppid,
                      cmd.endsWith("java"));
            }
          }
        }
      }
    } catch (Exception e) {
      if (logger == null) {
        e.printStackTrace();
      } else {
        logger.error(location, null, e);
      }
    } finally {
      if (reader != null) {
        try {
          reader.close();
        } catch (Exception exx) {
        }
      }
    }
    StringBuffer sb = new StringBuffer();
    // if no processes found, clear rogue process list and list of processes associated with a
    // reserve
    if (currentPids.isEmpty()) {
      for (Map.Entry<String, NodeUsersInfo> entry : map.entrySet()) {
        entry.getValue().getReserveProcesses().clear();
        entry.getValue().getRogueProcesses().clear();
      }
    }

    for (Map.Entry<String, NodeUsersInfo> entry : map.entrySet()) {
      sb.append(entry.getValue().toString()).append("\n");
    }
    if (logger == null) {
      System.out.println(sb.toString());
      System.out.println(
              "***************************************************************************************");
    } else {
      if (sb.length() > 0) {
        logger.info(location, null, sb.toString());
        logger.info(location, null,
                "******************************************************************************");
      }
    }
    // remove any rogue processes that are not in the list of current processes just collected
    agent.getRogueProcessReaper().removeDeadRogueProcesses(currentPids);
    return map;
  }