console/react/src/chord/data.js (189 lines of code) (raw):

/* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ import { MIN_CHORD_THRESHOLD } from "./matrix.js"; import { utils } from "../common/amqp/utilities.js"; const SAMPLES = 5; // number of snapshots to use for rate calculations class ChordData { // eslint-disable-line no-unused-vars constructor(QDRService, isRate, converter) { this.QDRService = QDRService; this.last_matrix = undefined; this.last_values = { values: undefined, timestamp: undefined }; this.rateValues = undefined; this.snapshots = []; // last N values used for calculating rate this.isRate = isRate; // fn to convert raw data to matrix this.converter = converter; // object that determines which addresses are excluded this.filter = []; } setRate(isRate) { this.isRate = isRate; } setConverter(converter) { this.converter = converter; } setFilter(filter) { this.filter = filter; } getAddresses() { let addresses = {}; let outer = this.snapshots; if (outer.length === 0) outer = outer = [this.last_values]; outer.forEach(function(snap) { snap.values.forEach(function(lv) { if (!(lv.address in addresses)) { addresses[lv.address] = this.filter.indexOf(lv.address) < 0; } }, this); }, this); return addresses; } getRouters() { let routers = {}; let outer = this.snapshots; if (outer.length === 0) outer = [this.last_values]; outer.forEach(function(snap) { snap.values.forEach(function(lv) { routers[lv.egress] = true; routers[lv.ingress] = true; }); }); return Object.keys(routers).sort(); } applyFilter(filter) { if (filter) this.setFilter(filter); return new Promise(function(resolve) { resolve(convert(this, this.last_values)); }); } // construct a square matrix of the number of messages each router has egressed from each router getMatrix() { let self = this; return new Promise(function(resolve, reject) { // get the router.node and router.link info for interior routers const interior = self.QDRService.management.topology .nodeIdList() .filter(n => utils.typeFromId(n) !== "_edge"); self.QDRService.management.topology.fetchEntities( interior, [ { entity: "router.node", attrs: ["id", "index"] }, { entity: "router.link", attrs: ["linkType", "linkDir", "owningAddr", "ingressHistogram"] } ], function(results) { if (!results) { reject(Error("unable to fetch entities")); return; } // the raw data received from the routers let values = []; // for each router in the network for (let nodeId in results) { // get a map of router ids to index into ingressHistogram for the links for this router. // each routers has a different order for the routers let routerNode = results[nodeId]["router.node"]; if (!routerNode) { continue; } // ingressRouters is an array of router names in the same order // that the ingressHistogram values will be in let ingressRouters = self.QDRService.utilities .flattenAll(routerNode) .sort((a, b) => (a.index < b.index ? -1 : a.index > b.index ? 1 : 0)) .map(n => n.id); // the name of the router we are working on let egressRouter = self.QDRService.utilities.nameFromId(nodeId); // loop through the router links for this router looking for // out/endpoint/non-console links let routerLinks = results[nodeId]["router.link"]; for (let i = 0; routerLinks && i < routerLinks.results.length; i++) { let link = self.QDRService.utilities.flatten( routerLinks.attributeNames, routerLinks.results[i] ); // if the link is an outbound/enpoint/non console if ( link.linkType === "endpoint" && link.linkDir === "out" && link.owningAddr && !link.owningAddr.startsWith("Ltemp.") && !link.owningAddr.startsWith("M0$") ) { // keep track of the raw egress values as well as their // ingress and egress routers and the address for (let j = 0; j < ingressRouters.length; j++) { let messages = link.ingressHistogram ? link.ingressHistogram[j] : 0; if (messages) { values.push({ ingress: ingressRouters[j], egress: egressRouter, address: self.QDRService.utilities.addr_text(link.owningAddr), messages: messages }); } } } } } // values is an array of objects like [{ingress: 'xxx', egress: 'xxx', address: 'xxx', messages: ###}, ....] // convert the raw values array into a matrix object let matrix = convert(self, values); // resolve the promise resolve(matrix); } ); }); } convertUsing(converter) { let values = this.isRate ? this.rateValues : this.last_values.values; // convert the values to a matrix using the requested converter and the current filter return converter(values, this.filter); } } // Private functions // compare the current values to the last_values and return the rate/second let calcRate = function(values, last_values, snapshots) { let now = Date.now(); if (!last_values.values) { last_values.values = values; last_values.timestamp = now - 1000; } // ensure the snapshots are initialized if (snapshots.length < SAMPLES) { for (let i = 0; i < SAMPLES; i++) { if (snapshots.length < i + 1) { snapshots[i] = JSON.parse(JSON.stringify(last_values)); snapshots[i].timestamp = now - 1000 * (SAMPLES - i); } } } // remove oldest sample snapshots.shift(); // add the new values to the end. snapshots.push(JSON.parse(JSON.stringify(last_values))); let oldest = snapshots[0]; let newest = snapshots[snapshots.length - 1]; let rateValues = []; let elapsed = (newest.timestamp - oldest.timestamp) / 1000; let getValueFor = function(snap, value) { for (let i = 0; i < snap.values.length; i++) { if ( snap.values[i].ingress === value.ingress && snap.values[i].egress === value.egress && snap.values[i].address === value.address ) return snap.values[i].messages; } }; values.forEach(function(value) { let first = getValueFor(oldest, value); let last = getValueFor(newest, value); let rate = (last - first) / elapsed; rateValues.push({ ingress: value.ingress, egress: value.egress, address: value.address, messages: Math.max(rate, MIN_CHORD_THRESHOLD) }); }); return rateValues; }; let genKeys = function(values) { values.forEach(function(value) { value.key = value.egress + value.ingress + value.address; }); }; let sortByKeys = function(values) { return values.sort(function(a, b) { return a.key > b.key ? 1 : a.key < b.key ? -1 : 0; }); }; let convert = function(self, values) { // sort the raw data by egress router name genKeys(values); sortByKeys(values); self.last_values.values = JSON.parse(JSON.stringify(values)); self.last_values.timestamp = Date.now(); if (self.isRate) { self.rateValues = values = calcRate(values, self.last_values, self.snapshots); } // convert the raw data to a matrix let matrix = self.converter(values, self.filter); self.last_matrix = matrix; return matrix; }; export { ChordData };