console/react/src/chord/data.js (189 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
import { MIN_CHORD_THRESHOLD } from "./matrix.js";
import { utils } from "../common/amqp/utilities.js";
const SAMPLES = 5; // number of snapshots to use for rate calculations
class ChordData {
// eslint-disable-line no-unused-vars
constructor(QDRService, isRate, converter) {
this.QDRService = QDRService;
this.last_matrix = undefined;
this.last_values = { values: undefined, timestamp: undefined };
this.rateValues = undefined;
this.snapshots = []; // last N values used for calculating rate
this.isRate = isRate;
// fn to convert raw data to matrix
this.converter = converter;
// object that determines which addresses are excluded
this.filter = [];
}
setRate(isRate) {
this.isRate = isRate;
}
setConverter(converter) {
this.converter = converter;
}
setFilter(filter) {
this.filter = filter;
}
getAddresses() {
let addresses = {};
let outer = this.snapshots;
if (outer.length === 0) outer = outer = [this.last_values];
outer.forEach(function(snap) {
snap.values.forEach(function(lv) {
if (!(lv.address in addresses)) {
addresses[lv.address] = this.filter.indexOf(lv.address) < 0;
}
}, this);
}, this);
return addresses;
}
getRouters() {
let routers = {};
let outer = this.snapshots;
if (outer.length === 0) outer = [this.last_values];
outer.forEach(function(snap) {
snap.values.forEach(function(lv) {
routers[lv.egress] = true;
routers[lv.ingress] = true;
});
});
return Object.keys(routers).sort();
}
applyFilter(filter) {
if (filter) this.setFilter(filter);
return new Promise(function(resolve) {
resolve(convert(this, this.last_values));
});
}
// construct a square matrix of the number of messages each router has egressed from each router
getMatrix() {
let self = this;
return new Promise(function(resolve, reject) {
// get the router.node and router.link info for interior routers
const interior = self.QDRService.management.topology
.nodeIdList()
.filter(n => utils.typeFromId(n) !== "_edge");
self.QDRService.management.topology.fetchEntities(
interior,
[
{ entity: "router.node", attrs: ["id", "index"] },
{
entity: "router.link",
attrs: ["linkType", "linkDir", "owningAddr", "ingressHistogram"]
}
],
function(results) {
if (!results) {
reject(Error("unable to fetch entities"));
return;
}
// the raw data received from the routers
let values = [];
// for each router in the network
for (let nodeId in results) {
// get a map of router ids to index into ingressHistogram for the links for this router.
// each routers has a different order for the routers
let routerNode = results[nodeId]["router.node"];
if (!routerNode) {
continue;
}
// ingressRouters is an array of router names in the same order
// that the ingressHistogram values will be in
let ingressRouters = self.QDRService.utilities
.flattenAll(routerNode)
.sort((a, b) => (a.index < b.index ? -1 : a.index > b.index ? 1 : 0))
.map(n => n.id);
// the name of the router we are working on
let egressRouter = self.QDRService.utilities.nameFromId(nodeId);
// loop through the router links for this router looking for
// out/endpoint/non-console links
let routerLinks = results[nodeId]["router.link"];
for (let i = 0; routerLinks && i < routerLinks.results.length; i++) {
let link = self.QDRService.utilities.flatten(
routerLinks.attributeNames,
routerLinks.results[i]
);
// if the link is an outbound/enpoint/non console
if (
link.linkType === "endpoint" &&
link.linkDir === "out" &&
link.owningAddr &&
!link.owningAddr.startsWith("Ltemp.") &&
!link.owningAddr.startsWith("M0$")
) {
// keep track of the raw egress values as well as their
// ingress and egress routers and the address
for (let j = 0; j < ingressRouters.length; j++) {
let messages = link.ingressHistogram ? link.ingressHistogram[j] : 0;
if (messages) {
values.push({
ingress: ingressRouters[j],
egress: egressRouter,
address: self.QDRService.utilities.addr_text(link.owningAddr),
messages: messages
});
}
}
}
}
}
// values is an array of objects like [{ingress: 'xxx', egress: 'xxx', address: 'xxx', messages: ###}, ....]
// convert the raw values array into a matrix object
let matrix = convert(self, values);
// resolve the promise
resolve(matrix);
}
);
});
}
convertUsing(converter) {
let values = this.isRate ? this.rateValues : this.last_values.values;
// convert the values to a matrix using the requested converter and the current filter
return converter(values, this.filter);
}
}
// Private functions
// compare the current values to the last_values and return the rate/second
let calcRate = function(values, last_values, snapshots) {
let now = Date.now();
if (!last_values.values) {
last_values.values = values;
last_values.timestamp = now - 1000;
}
// ensure the snapshots are initialized
if (snapshots.length < SAMPLES) {
for (let i = 0; i < SAMPLES; i++) {
if (snapshots.length < i + 1) {
snapshots[i] = JSON.parse(JSON.stringify(last_values));
snapshots[i].timestamp = now - 1000 * (SAMPLES - i);
}
}
}
// remove oldest sample
snapshots.shift();
// add the new values to the end.
snapshots.push(JSON.parse(JSON.stringify(last_values)));
let oldest = snapshots[0];
let newest = snapshots[snapshots.length - 1];
let rateValues = [];
let elapsed = (newest.timestamp - oldest.timestamp) / 1000;
let getValueFor = function(snap, value) {
for (let i = 0; i < snap.values.length; i++) {
if (
snap.values[i].ingress === value.ingress &&
snap.values[i].egress === value.egress &&
snap.values[i].address === value.address
)
return snap.values[i].messages;
}
};
values.forEach(function(value) {
let first = getValueFor(oldest, value);
let last = getValueFor(newest, value);
let rate = (last - first) / elapsed;
rateValues.push({
ingress: value.ingress,
egress: value.egress,
address: value.address,
messages: Math.max(rate, MIN_CHORD_THRESHOLD)
});
});
return rateValues;
};
let genKeys = function(values) {
values.forEach(function(value) {
value.key = value.egress + value.ingress + value.address;
});
};
let sortByKeys = function(values) {
return values.sort(function(a, b) {
return a.key > b.key ? 1 : a.key < b.key ? -1 : 0;
});
};
let convert = function(self, values) {
// sort the raw data by egress router name
genKeys(values);
sortByKeys(values);
self.last_values.values = JSON.parse(JSON.stringify(values));
self.last_values.timestamp = Date.now();
if (self.isRate) {
self.rateValues = values = calcRate(values, self.last_values, self.snapshots);
}
// convert the raw data to a matrix
let matrix = self.converter(values, self.filter);
self.last_matrix = matrix;
return matrix;
};
export { ChordData };