in ambari-server/src/main/java/org/apache/ambari/server/controller/ganglia/GangliaPropertyProvider.java [425:545]
public Collection<Resource> populateResources() throws SystemException {
//Get full url with parameters
String specWithParams = getSpec(clusterName, clusterSet, hostSet, metrics.keySet(), temporalInfo);
//URL
String spec = null;
//Parameters
String params = null;
String[] tokens = specWithParams.split("\\?", 2);
try {
spec = tokens[0];
params = tokens[1];
} catch (ArrayIndexOutOfBoundsException e) {
LOG.info(e.toString());
}
BufferedReader reader = null;
try {
//Check if host is live
if (!hostProvider.isGangliaCollectorHostLive(clusterName)) {
LOG.info("Ganglia host is not live");
return Collections.emptySet();
}
//Check if Ganglia server component is live
if (!hostProvider.isGangliaCollectorComponentLive(clusterName)) {
LOG.info("Ganglia server component is not live");
return Collections.emptySet();
}
reader = new BufferedReader(new InputStreamReader(
getStreamProvider().readFrom(spec, "POST", params)));
String feedStart = reader.readLine();
if (feedStart == null || feedStart.isEmpty()) {
LOG.info("Empty feed while getting ganglia metrics for spec => "+
spec);
return Collections.emptySet();
}
int startTime = convertToNumber(feedStart).intValue();
String dsName = reader.readLine();
if (dsName == null || dsName.isEmpty()) {
LOG.info("Feed without body while reading ganglia metrics for spec " +
"=> " + spec);
return Collections.emptySet();
}
while(!dsName.equals("[AMBARI_END]")) {
GangliaMetric metric = new GangliaMetric();
List<GangliaMetric.TemporalMetric> listTemporalMetrics =
new ArrayList<GangliaMetric.TemporalMetric>();
metric.setDs_name(dsName);
metric.setCluster_name(reader.readLine());
metric.setHost_name(reader.readLine());
metric.setMetric_name(reader.readLine());
int time = convertToNumber(reader.readLine()).intValue();
int step = convertToNumber(reader.readLine()).intValue();
String val = reader.readLine();
while(! val.equals("[AMBARI_DP_END]")) {
GangliaMetric.TemporalMetric tm = new GangliaMetric.TemporalMetric(val, time);
if (tm.isValid()) listTemporalMetrics.add(tm);
time += step;
val = reader.readLine();
}
metric.setDatapointsFromList(listTemporalMetrics);
ResourceKey key = new ResourceKey(metric.getHost_name(), metric.getCluster_name());
Set<Resource> resourceSet = resources.get(key);
if (resourceSet != null) {
for (Resource resource : resourceSet) {
populateResource(resource, metric);
}
}
dsName = reader.readLine();
if (dsName == null || dsName.isEmpty()) {
LOG.info("Unexpected end of stream reached while getting ganglia " +
"metrics for spec => " + spec);
return Collections.emptySet();
}
}
String feedEnd = reader.readLine();
if (feedEnd == null || feedEnd.isEmpty()) {
LOG.info("Error reading end of feed while getting ganglia metrics " +
"for spec => " + spec);
} else {
int endTime = convertToNumber(feedEnd).intValue();
int totalTime = endTime - startTime;
if (LOG.isInfoEnabled() && totalTime > POPULATION_TIME_UPPER_LIMIT) {
LOG.info("Ganglia resource population time: " + totalTime);
}
}
} catch (IOException e) {
if (LOG.isErrorEnabled()) {
LOG.error("Caught exception getting Ganglia metrics : spec=" + spec, e);
}
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Unable to close http input steam : spec=" + spec, e);
}
}
}
}
//todo: filter out resources and return keepers
return Collections.emptySet();
}