public Collection populateResources()

in ambari-server/src/main/java/org/apache/ambari/server/controller/ganglia/GangliaPropertyProvider.java [425:545]


    public Collection<Resource> populateResources() throws SystemException {

      //Get full url with parameters
      String specWithParams = getSpec(clusterName, clusterSet, hostSet, metrics.keySet(), temporalInfo);
      
      //URL
      String spec = null;
      //Parameters
      String params = null;
      
      String[] tokens = specWithParams.split("\\?", 2);

      try {
        spec = tokens[0];
        params = tokens[1];
      } catch (ArrayIndexOutOfBoundsException e) {
        LOG.info(e.toString());
      }
      

      BufferedReader reader = null;
      try {
        
        //Check if host is live
        if (!hostProvider.isGangliaCollectorHostLive(clusterName)) {
          LOG.info("Ganglia host is not live");
          return Collections.emptySet();
        }
        
        //Check if Ganglia server component is live
        if (!hostProvider.isGangliaCollectorComponentLive(clusterName)) {
          LOG.info("Ganglia server component is not live");
          return Collections.emptySet();
        }

        reader = new BufferedReader(new InputStreamReader(
            getStreamProvider().readFrom(spec, "POST", params)));

        String feedStart = reader.readLine();
        if (feedStart == null || feedStart.isEmpty()) {
          LOG.info("Empty feed while getting ganglia metrics for spec => "+
            spec);
          return Collections.emptySet();
        }
        int startTime = convertToNumber(feedStart).intValue();

        String dsName = reader.readLine();
        if (dsName == null || dsName.isEmpty()) {
          LOG.info("Feed without body while reading ganglia metrics for spec " +
            "=> " + spec);
          return Collections.emptySet();
        }

        while(!dsName.equals("[AMBARI_END]")) {
          GangliaMetric metric = new GangliaMetric();
          List<GangliaMetric.TemporalMetric> listTemporalMetrics =
              new ArrayList<GangliaMetric.TemporalMetric>();

          metric.setDs_name(dsName);
          metric.setCluster_name(reader.readLine());
          metric.setHost_name(reader.readLine());
          metric.setMetric_name(reader.readLine());

          int time = convertToNumber(reader.readLine()).intValue();
          int step = convertToNumber(reader.readLine()).intValue();

          String val = reader.readLine();
          while(! val.equals("[AMBARI_DP_END]")) {
            GangliaMetric.TemporalMetric tm = new GangliaMetric.TemporalMetric(val, time);
            if (tm.isValid()) listTemporalMetrics.add(tm);
            time += step;
            val = reader.readLine();
          }

          metric.setDatapointsFromList(listTemporalMetrics);

          ResourceKey key = new ResourceKey(metric.getHost_name(), metric.getCluster_name());
          Set<Resource> resourceSet = resources.get(key);
          if (resourceSet != null) {
            for (Resource resource : resourceSet) {
              populateResource(resource, metric);
            }
          }

          dsName = reader.readLine();
          if (dsName == null || dsName.isEmpty()) {
            LOG.info("Unexpected end of stream reached while getting ganglia " +
              "metrics for spec => " + spec);
            return Collections.emptySet();
          }
        }
        String feedEnd = reader.readLine();
        if (feedEnd == null || feedEnd.isEmpty()) {
          LOG.info("Error reading end of feed while getting ganglia metrics " +
            "for spec => " + spec);
        } else {

          int endTime = convertToNumber(feedEnd).intValue();
          int totalTime = endTime - startTime;
          if (LOG.isInfoEnabled() && totalTime > POPULATION_TIME_UPPER_LIMIT) {
            LOG.info("Ganglia resource population time: " + totalTime);
          }
        }
      } catch (IOException e) {
        if (LOG.isErrorEnabled()) {
          LOG.error("Caught exception getting Ganglia metrics : spec=" + spec, e);
        }
      } finally {
        if (reader != null) {
          try {
            reader.close();
          } catch (IOException e) {
            if (LOG.isWarnEnabled()) {
              LOG.warn("Unable to close http input steam : spec=" + spec, e);
            }
          }
        }
      }
      //todo: filter out resources and return keepers
      return Collections.emptySet();
    }