tools/owperf/owperf.js (664 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /** * This is a test tool for measuring the performance of OpenWhisk actions and rules. * The full documentation of the tool is available in README.md . */ const fs = require('fs'); const ini = require('ini'); const cluster = require('cluster'); const openwhisk = require('openwhisk'); const program = require('commander'); const exec = require('node-exec-promise').exec; const ACTION = "action"; const RULE = "rule"; const RESULT = "result"; const ACTIVATION = "activation"; const NONE = "none"; function parseIntDef(strval, defval) { return parseInt(strval); } program .description('Latency and throughput measurement of OpenWhisk actions and rules') .version('0.0.1') .option('-a, --activity <action/rule>', "Activity to measure", /^(action|rule)$/i, "action") .option('-b, --blocking <result/activation/none>', "For actions, wait until result or activation, or don't wait", /^(result|activation|none)$/i, "none") .option('-d, --delta <msec>', "Time diff between consequent invocations of the same worker, in msec", parseIntDef, 200) .option('-i, --iterations <count>', "Number of measurement iterations", parseInt) .option('-p, --period <msec>', "Period of measurement in msec", parseInt) .option('-r, --ratio <count>', "How many actions per iteration (or rules per trigger)", parseIntDef, 1) .option('-s, --parameter_size <size>', "Size of string parameter passed to trigger or actions", parseIntDef, 1000) .option('-w, --workers <count>', "Total number of concurrent workers incl. master", parseIntDef, 1) .option('-A, --master_activity <action/rule>', "Set master activity apart from other workerss", /^(action|rule)$/i) .option('-B, --master_blocking <result/activation/none>', "Set master blocking apart from other workers", /^(result|activation|none)$/i) .option('-D, --master_delta <msec>', "Set master delta apart from other workers", parseInt) .option('-u, --warmup <count>', "How many invocations to perform at each worker as warmup", parseIntDef, 5) .option('-l, --delay <msec>', "How many msec to delay at each action", parseIntDef, 50) .option('-P --pp_delay <msec>', "Wait for remaining activations to finalize before post-processing", parseIntDef, 60000) .option('-G --burst_timing', "For actions, use the same invocation timing (BI) for all actions in a burst") .option('-S --no-setup', "Skip test setup (so use previous setup)") .option('-T --no-teardown', "Skip test teardown (to allow setup reuse)") .option('-f --config_file <filepath>', "Specify a wskprops configuration file to use", `${process.env.HOME}/.wskprops`) .option('-q, --quiet', "Suppress progress information on stderr"); program.parse(process.argv); var testRecord = {input: {}, output: {}}; // holds the final test data for (var opt in program.opts()) if (typeof program[opt] != 'function') testRecord.input[opt] = program[opt]; // If neither period nor iterations are set, then period is set by default to 1000 msec if (!testRecord.input.iterations && !testRecord.input.period) testRecord.input.period = 1000; // If either master_activity, master_blocking or master_delta are set, then test is in 'master apart' mode testRecord.input.master_apart = ((testRecord.input.master_activity || testRecord.input.master_blocking || testRecord.input.master_delta) && true); mLog("Parameter Configuration:"); for (var opt in testRecord.input) mLog(`${opt} = ${testRecord.input[opt]}`); mLog("-----\n"); mLog("Generating invocation parameters"); var inputMessage = "A".repeat(testRecord.input.parameter_size); var params = {sleep: testRecord.input.delay, message: inputMessage}; mLog("Loading wskprops"); const config = ini.parse(fs.readFileSync(testRecord.input.config_file, "utf-8")); mLog("APIHOST = " + config.APIHOST); mLog("AUTH = " + config.AUTH); mLog("-----\n"); const wskParams = `--apihost ${config.APIHOST} --auth ${config.AUTH} -i`; // to be used when invoking setup and teardown via external wsk // openwhisk client used for invocations const ow = openwhisk({apihost: config.APIHOST, api_key: config.AUTH, ignore_certs: true}); // counters for throughput computation (all) const tpCounters = {attempts: 0, invocations: 0, activations: 0, requests: 0, errors: 0}; // counters for latency computation const latCounters = { ta: {sum: undefined, sumSqr: undefined, min: undefined, max: undefined}, oea: {sum: undefined, sumSqr: undefined, min: undefined, max: undefined}, oer: {sum: undefined, sumSqr: undefined, min: undefined, max: undefined}, d: {sum: undefined, sumSqr: undefined, min: undefined, max: undefined}, ad: {sum: undefined, sumSqr: undefined, min: undefined, max: undefined}, ora: {sum: undefined, sumSqr: undefined, min: undefined, max: undefined}, rtt: {sum: undefined, sumSqr: undefined, min: undefined, max: undefined}, ortt: {sum: undefined, sumSqr: undefined, min: undefined, max: undefined} }; const measurementTime = {start: -1, stop: -1}; const sampleData = []; // array of samples (tuples of collected invocation data, for rule or for action, depending on the activity) var loopSleeper; // used to abort sleep in mainLoop() var abort = false; // used to abort the loop in mainLoop() // Used only at the master var workerData = []; // holds data for each worker, at [1..#workers]. Master's entry is 0. const activity = ((cluster.isWorker || !testRecord.input.master_activity) ? testRecord.input.activity : testRecord.input.master_activity); if (cluster.isMaster) runMaster(); else runWorker(); // -------- END OF MAIN ------------- /** * Master operation */ function runMaster() { // Setup OpenWhisk assets for the test testSetup().then(() => { // Start workers, configure interaction for(var i = 0; i < testRecord.input.workers; i++) { if (i > 0) // fork only (workers - 1) times cluster.fork(); } for (const id in cluster.workers) { // Exit handler for each worker cluster.workers[id].on('exit', (code, signal) => { if (signal) mLog(`Worker ${id} was killed by signal: ${signal}`); else if (code !== 0) mLog(`Worker ${id} exited with error code: ${code}`); checkExit(); }); // Message handler for each worker cluster.workers[id].on('message', (msg) => { if (msg.init) // Initialization barrier for workers. Makes sure they are all fully engaged when the measurement start checkInit(); if (msg.summary) { workerData[id] = msg.summary; checkSummary(); } }); } mainLoop().then(() => { // set finish of measurement and notify all other workers measurementTime.stop = new Date().getTime(); testRecord.output.measure_time = (measurementTime.stop - measurementTime.start) / 1000.0; // measurement duration converted to seconds mLog(`Stop measurement. Start post-processing after ${testRecord.input.pp_delay} msec`); mLogSampleHeader(); for (const j in cluster.workers) cluster.workers[j].send({abort: measurementTime}); // The master's post-processing to generate its workerData sleep(testRecord.input.pp_delay) .then(() => { postProcess() .then(() => { // The master's workerData workerData[0] = {lat: latCounters, tp: tpCounters}; checkSummary(); }) .catch(err => { // FATAL - shouldn't happen unless BUG mLog(`Post-process ERROR in MASTER: ${err}`); throw err; }); }); }); }); } /** * Setup assets before the test depending on configuration */ async function testSetup() { if (!testRecord.input.setup) return; const cmd = `./setup.sh s ${testRecord.input.ratio} ${wskParams}`; mLog(`SETUP: ${cmd}`); try { await exec(cmd); } catch (error) { mLog(`FATAL: setup failure - ${error}`); process.exit(-2); } } /** * Teardown assets after the test depending on configuration */ async function testTeardown() { if (!testRecord.input.teardown) return; const cmd = `./setup.sh t ${testRecord.input.ratio} ${wskParams}`; mLog(`TEARDOWN: ${cmd}`); try { await exec(cmd); } catch (error) { mLog(`WARNING: teardown error - ${error}`); process.exit(-3); } } /** * Print table header for samples to the runtime log on stderr */ function mLogSampleHeader() { mLog("bi,\tas,\tae,\tts,\tta,\toea,\toer,\td,\tad,\tai,\tora,\trtt,\tortt"); } /** * Worker operation */ function runWorker() { // abort message from master will set the measurement time frame and abort the loop process.on('message', (msg) => { if (msg.abort) { // Set the measurement time frame at the worker - required for post-processing measurementTime.start = msg.abort.start; measurementTime.stop = msg.abort.stop; abortLoop(); } }); mainLoop().then(() => { sleep(testRecord.input.pp_delay) .then(() => { postProcess() .then(() => { process.send({summary:{lat: latCounters, tp:tpCounters}}); process.exit(); }) .catch(err => { // shouldn't happen unless BUG mLog(`Post-process ERROR in WORKER: ${err}`); throw err; }); }); }); } // Barrier for checking all workers have initialized and then start measurement var remainingInits = testRecord.input.workers; var remainingIterations = -1; function checkInit() { remainingInits--; if (remainingInits == 0) { // all workers are engaged (incl. master) - can start measurement mLog("All clients finished warmup. Start measurement."); measurementTime.start = new Date().getTime(); if (testRecord.input.period) setTimeout(abortLoop, testRecord.input.period); if (testRecord.input.iterations) remainingIterations = testRecord.input.iterations; } } // Barrier for checking all workers have finished, generate output and exit var remainingExits = testRecord.input.workers; function checkExit() { remainingExits--; if (remainingExits == 0) { mLog("All workers finished - generating output and exiting."); generateOutput(); // Cleanup test assets from OW and then exit testTeardown().then(() => { mLog("Done"); process.exit(); }); } } // Barrier for receiving post-processing results from all workers before computing final results var remainingSummaries = testRecord.input.workers; function checkSummary() { remainingSummaries--; if (remainingSummaries == 0) { mLogSampleHeader(); mLog("All clients post-processing completed - computing output.") computeOutputRecord(); checkExit(); } } /** * Main loop for invocations - invoke activity asynchronously once every (delta) msec until aborted */ async function mainLoop() { var warmupCounter = testRecord.input.warmup; const delta = ((cluster.isWorker || !testRecord.input.master_delta) ? testRecord.input.delta : testRecord.input.master_delta); const blocking = ((cluster.isWorker || !testRecord.input.master_blocking) ? testRecord.input.blocking : testRecord.input.master_blocking); const doBlocking = (blocking != NONE); const getResult = (blocking == RESULT); while (!abort) { // ---- // Pass init (worker - send message) after <warmup> iterations if (warmupCounter == 0) { if (cluster.isMaster) checkInit(); else // worker - send init process.send({init: 1}); } if (warmupCounter >= 0) // take 0 down to -1 to make sure it does not trigger another init message warmupCounter--; // ---- // If iterations limit set, abort loop when finished iterations if (remainingIterations == 0) { abortLoop(); continue; } if (remainingIterations > 0) remainingIterations--; const si = new Date().getTime(); // SI = Start of Iteration timestamp var samples; if (activity == ACTION) samples = await invokeActions(testRecord.input.ratio, doBlocking, getResult, si); else samples = await invokeRules(si); samples.forEach(sample => { sampleData.push(sample); }); const ei = new Date().getTime(); // EI = End of Iteration timestamp const duration = ei - si; if (delta > duration) { loopSleeper = sleep(delta - duration); if (!abort) // check again to avoid race condition on loopSleeper await loopSleeper; } } } /** * Used to abort the mainLoop() function */ function abortLoop() { abort = true; if (loopSleeper) loopSleeper.resolve(); } /** * Invoke the predefined OW action a specified number of times without waiting using Promises (burst). * Returns a promise that resolves to an array of {id, isError}. */ function invokeActions(count, doBlocking, getResult, burst_bi) { return new Promise( function (resolve, reject) { var ipa = []; // array of invocation promises; for(var i = 0; i< count; i++) { ipa[i] = new Promise((resolve, reject) => { const bi = (testRecord.input.burst_timing ? burst_bi : new Date().getTime()); // default is BI per invocation ow.actions.invoke({name: 'testAction', blocking: doBlocking, result: getResult, params: params}) // If returnedJSON is full activation or just activation ID then activation ID should be in "activationId" field // If returnedJSON is the result of the test action, then "activationId" is part of the returned result of the test action .then(returnedJSON => { var ai; // after invocation if (doBlocking) ai = new Date().getTime(); // only for blocking invocations, AI is meaningful resolve({aaid: returnedJSON.activationId, bi: bi, ai: ai}); }) .catch(err => { resolve({aaidError: err}); }); }); } Promise.all(ipa).then(ipArray => { resolve(ipArray); }).catch(err => { // Impossible to reach since no contained promise rejects reject(err); }); }); } /** * Invoke the predefined OW rules asynchronously and return a promise of an array with a single element of {id, isError} */ function invokeRules(bi) { return new Promise( function (resolve, reject) { const triggerSamples = []; // Fire trigger to invoke the rule ow.triggers.invoke({name: 'testTrigger', params: params}) .then(triggerActivationIdJSON => { const triggerActivationId = triggerActivationIdJSON.activationId; triggerSamples.push({taid: triggerActivationId, bi: bi}); resolve(triggerSamples); }) .catch (err => { triggerSamples.push({taidError: err}); resolve(triggerSamples); }); }); } /** * This function processes the sampleData. Each sample is processed as following: * 1. A sample with error (TAID or AAID) is processed directly (not much to do beyond counting errors) * 2. An action sample - first attempt to retrieve activation, then process with it * 3. A rule sample - first convert to set of bound action samples (by processing the trigger activation), then process each action sample in step 2 above */ async function postProcess() { for(var i in sampleData) { const sample = sampleData[i]; if (activity == ACTION) { await processSampleWithAction(sample); } else { // activity == RULE if (sample.taidError) // TAID error - no need to retrieve bound actions - move to process the sample directly processSample(sample); else { // have valid TAID - retrieve bound action ids and then process const actionSamples = await getActionSamplesOfRules(sample); for(var j in actionSamples) await processSampleWithAction(actionSamples[j]); } } } } /** * Retrieve the activation ids of the actions bound to the trigger activation provided by id. * Failure to retrieve trigger activation for a valid activation id is considered a fatal error, since the activation must exist. * @param {*} triggerActivation */ function getActionSamplesOfRules(triggerSample) { return new Promise((resolve, reject) => { ow.activations.get({name: triggerSample.taid}) .then(triggerActivation => { triggerSample.ts = triggerActivation.start; var actionSamples = []; for(var i = 0; i < triggerActivation.logs.length; i++) { const boundActionRecord = JSON.parse(triggerActivation.logs[i]); const actionSample = Object.assign({}, triggerSample); if (boundActionRecord.success) actionSample.aaid = boundActionRecord.activationId; else actionSample.aaidError = boundActionRecord.error; actionSamples.push(actionSample); } resolve(actionSamples); }) .catch (err => { // FATAL: failed to retrieve trigger activation for a valid id mLog(`getActionSamplesOfRules returned ERROR: ${err}`); reject(err); }); }); } /** * Processing each action sample sequentially, i.e., wait until activation is retrieved before retrieving the next one. * Otherwise, concurrent retrieval of possibly thousands of activations and more, may cause issues. * Failure to retrieve activation record for a valid id is ok, assuming the action may have not completed yet. * @param {*} actionSample */ async function processSampleWithAction(actionSample) { if (actionSample.aaidError) // no activation, move on to processing sample with error processSample(actionSample); else { // have activation, try to get record var activation; try { activation = await ow.activations.get({name: actionSample.aaid}); } catch (err) { mLog(`Failed to retrieve activation for id: ${actionSample.aaid} for reason: ${err}`); } processSample(actionSample, activation); } } /** * Process a single sample + optional related action activation, updating latency and throughput counters * @param {*} sample */ function processSample(sample, activation) { const bi = sample.bi; if (bi < measurementTime.start || bi > measurementTime.stop) { // BI outside time frame. No further processing. mLog(`Sample discarded. BI exceeds measurement time frame`); return; } tpCounters.attempts++; // each sample invoked in the time frame counts as one invocation attempt if (sample.taidError) { // trigger activation failed - count one request, one error. No further processing. tpCounters.requests++; tpCounters.errors++; mLog(`Sample discarded. Trigger activation error: ${sample.taidError}`); return; } var ts; if (sample.ts) { ts = parseInt(sample.ts); if (ts >= measurementTime.start && ts <= measurementTime.stop) { // trigger activation in time frame - count one activation, one request tpCounters.activations++; tpCounters.requests++; } } else ts = undefined; if (sample.aaidError) { // action activation failed - count one request, one error. No further processing. tpCounters.requests++; tpCounters.errors++; mLog(`Sample discarded. Action activation error: ${sample.aaidError}`); return; } if (!activation) { // no activation, so assumed incomplete. No further processing. mLog(`Sample discarded. Activation was not retrieved.`) return; } const as = parseInt(activation.start); const ae = parseInt(activation.end); const d = parseInt(activation.response.result.duration); if (as < measurementTime.start || ae > measurementTime.stop) { // got activation, but it exceeds the time frame. No further processing. mLog(`Sample discarded. Action activation exceeded measurement time frame.`) return; } // Activation is in time frame, so count one activation, one request and one full invocation tpCounters.activations++; tpCounters.requests++; tpCounters.invocations++; // For full invocations, update latency counters const ta = (ts ? as - ts : undefined); const ad = ae - as; const oea = as - bi; const oer = ae - bi - d; updateLatSample("d", d); updateLatSample("ta", ta); updateLatSample("ad", ad); updateLatSample("oea", oea); updateLatSample("oer", oer); // for blocking action invocations - will be "undefined" otherwise const ai = sample.ai; const ora = (ai ? ai - ae : undefined); const rtt = (ai ? ai - bi : undefined); const ortt = (rtt ? rtt - d : undefined); updateLatSample("ora", ora); updateLatSample("rtt", rtt); updateLatSample("ortt", ortt); mLog(`${bi},\t${as},\t${ae},\t${ts},\t${ta},\t${oea},\t${oer},\t${d},\t${ad},\t${ai},\t${ora},\t${rtt},\t${ortt}`); } /** * Update counters of one latency statistic of a worker with value data from one sample */ function updateLatSample(statName, value) { if (!value) // value == undefined => skip it return; // Update sum for avg if (!latCounters[statName].sum) latCounters[statName].sum = 0; latCounters[statName].sum += value; // Update sumSqr for std if (!latCounters[statName].sumSqr) latCounters[statName].sumSqr = 0; latCounters[statName].sumSqr += value * value; // Update min value if (!latCounters[statName].min || latCounters[statName].min > value) latCounters[statName].min = value; // Update max value if (!latCounters[statName].max || latCounters[statName].max < value) latCounters[statName].max = value; } /** * Compute the final output record based on the workerData records. * The output of the program is a single CSV row of data consisting of the input parameters, * then latencies computed above - avg (average) and std (std. dev.), then throughput. */ function computeOutputRecord() { // Latency stats: avg, std, min, max ["ta", "oea", "oer", "d", "ad", "ora", "rtt", "ortt"].forEach(statName => { testRecord.output[statName] = computeLatStats(statName); }); // Tp stats: abs, tp, tpw, tpd ["attempts", "invocations", "activations", "requests"].forEach(statName => { testRecord.output[statName] = computeTpStats(statName); }); // Error stats: abs, percent testRecord.output.errors = computErrorStats(); } /** * Based on workerData, compute average and standard deviation of a given latency statistic. * @param {*} statName */ function computeLatStats(statName) { var totalSum = 0; var totalSumSqr = 0; var totalInvocations = 0; var hasSamples = undefined; // does the current stat have any samples. If not => undefined, not NaN var min = undefined; var max = undefined; if (testRecord.input.master_apart) { // in master_apart mode, only master performs latency measurements totalSum = workerData[0].lat[statName].sum; totalSumSqr = workerData[0].lat[statName].sumSqr; min = workerData[0].lat[statName].min; max = workerData[0].lat[statName].max; totalInvocations = workerData[0].tp.invocations; } else // in regular mode, all workers participate in latency measurements workerData.forEach(wd => { if (wd.lat[statName].sum) { // If this worker has valid latency samples (not undefined) hasSamples = 1; totalSum += wd.lat[statName].sum; totalSumSqr += wd.lat[statName].sumSqr; if (!min || min > wd.lat[statName].min) min = wd.lat[statName].min; if (!max || max < wd.lat[statName].max) max = wd.lat[statName].max; } totalInvocations += wd.tp.invocations; }); const avg = (hasSamples ? totalSum / totalInvocations : undefined); const std = (hasSamples ? Math.sqrt(totalSumSqr / totalInvocations - avg * avg) : undefined); return ({avg: avg, std: std, min: min, max: max}); } /** * Based on workerData, compute throughput of a given counter, with (tp) and without (tpw) the master, and the percent difference (tpd) * @param {*} statName */ function computeTpStats(statName) { var masterCount = workerData[0].tp[statName]; var totalCount = 0; workerData.forEach(wd => {totalCount += wd.tp[statName];}); const tp = totalCount / testRecord.output.measure_time; // throughput const tpw = (totalCount - masterCount) / testRecord.output.measure_time; // throughput without master const tpd = (tp - tpw) * 100.0 / tp; // percent difference relative to TP return ({abs: totalCount, tp: tp, tpw: tpw, tpd: tpd}); } /** * Based on workerData, compute the relative portion of total errors out of total requests */ function computErrorStats() { var totalErrors = 0; var totalRequests = 0; workerData.forEach(wd => { totalErrors += wd.tp.errors; totalRequests += wd.tp.requests; }); const errAbs = totalErrors; const errPer = totalErrors * 100.0 / totalRequests; return ({abs: errAbs, percent: errPer}); } /** * Generate a properly formatted output record to stdout. The header is also printed, but via mDump to stderr and can be * silenced. */ function generateOutput() { var first = true; // First, print header to stderr dfsObject(testRecord, (name, data, isRoot, isObj) => { if (!isObj) { // print leaf nodes if (!first) mWrite(",\t"); first = false; mWrite(`${name}`); } }); mWrite("\n"); first = true; // Now, print data to stdout dfsObject(testRecord, (name, data, isRoot, isObj) => { if (!isObj) { // print leaf nodes if (!first) process.stdout.write(",\t"); first = false; if (typeof data == 'number') // round each number to 3 decimal digits data = round(data, 3); process.stdout.write(`${data}`); } }); process.stdout.write("\n"); } /** * Sleep for a given time. Useful mostly with await from an async function * resolve and reject are externalized as properties to allow early abortion * @param {*} ms */ function sleep(ms) { var res, rej; var p = new Promise((resolve, reject) => { setTimeout(resolve, ms); res = resolve; rej = reject; }); p.resolve = res; p.reject = rej; return p; } /** * Generate a random integer in the range of [1..max] * @param {*} max */ function getRandomInt(max) { return Math.floor(Math.random() * Math.floor(max) + 1); } /** * Round a number after specified decimal digits * @param {*} num * @param {*} digits */ function round(num, digits = 0) { const factor = Math.pow(10, digits); return Math.round(num * factor) / factor; } // If not quiet, emit control messages on stderr (with newline) function mLog(text) { if (!testRecord.input.quiet) console.error(`${clientId()}:\t${text}`); } /** * Return the id of the client - MASTER-0 or WORKER-k (k=1..w-1) */ function clientId() { return (cluster.isMaster ? "MASTER-0" : `WORKER-${cluster.worker.id}`); } // If not quiet, write strings on stderr (w/o newline) function mWrite(text) { if (!testRecord.input.quiet) process.stderr.write(text); } /** * Traverse a (potentially deep) object in DFS, visiting each non-function node with function f * @param {*} data * @param {*} func */ function dfsObject(data, func, allowInherited = false) { var isRoot = true; var rootObj = data; crawlObj("", data, func, allowInherited); function crawlObj(name, data, f, allowInherited) { var isObj = (typeof data == 'object'); var isFunc = (typeof data == 'function'); if (!isFunc) f(name, data, isRoot, isObj); // visit the current node isRoot = false; if (isObj) for (var child in data) { if (allowInherited || data.hasOwnProperty(child)) { const childName = (name == "" ? child : name + "." + child); crawlObj(childName, data[child], f, true); // After root level no need to check inheritance } } } }