perf/perf_consumer.js (68 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ const commander = require('commander'); const delay = require('delay'); const { performance, } = require('perf_hooks'); const Pulsar = require('../index.js'); // Parse args (() => { commander .option('-s, --subscription [subscription]', 'Subscription') .option('-u, --url [url]', 'Pulsar Service URL') .option('-t, --topic [topic]', 'Topic Name') .option('-m, --messages [messages]', 'Number of Messages', 1000) .option('-i, --iteration [iteration]', 'Iteration of Measure', 3, parseInt) .on('--help', () => { console.log(''); console.log('Examples:'); console.log(' $ node perf/perf_consumer.js --subscription sub1 --url pulsar://localhost:6650 --topic persistent://public/default/my-topic'); }) .parse(process.argv); if (typeof commander.subscription === 'undefined') { console.error('no subscription given!'); process.exit(1); } if (typeof commander.url === 'undefined') { console.error('no URL given!'); process.exit(1); } if (typeof commander.topic === 'undefined') { console.error('no topic name given!'); process.exit(1); } console.log('----------------------'); commander.options.forEach((option) => { const optionName = (option.long).replace('--', ''); console.log(`${optionName}: ${commander[optionName]}`); }); console.log('----------------------'); })(); (async () => { // Create a client const clientConfig = { serviceUrl: commander.url, }; const client = new Pulsar.Client(clientConfig); // Create a consumer const consumerConfig = { topic: commander.topic, subscription: commander.subscription, }; const consumer = await client.subscribe(consumerConfig); const numOfMessages = parseInt(commander.messages, 10); for (let i = 0; i < commander.iteration; i += 1) { // measure await delay(1000); const startTimeMilliSeconds = performance.now(); const results = []; for (let mi = 0; mi < numOfMessages; mi += 1) { results.push(new Promise((resolve) => { consumer.receive().then((msg) => { consumer.acknowledge(msg); resolve(msg.getData().length); }); })); } const messageSizes = await Promise.all(results); const endTimeMilliSeconds = performance.now(); // result const durationSeconds = (endTimeMilliSeconds - startTimeMilliSeconds) / 1000.0; const rate = numOfMessages / durationSeconds; const totalMessageSize = messageSizes.reduce((a, b) => a + b); const throuhputMbit = (totalMessageSize / 1024 / 1024 * 8) / durationSeconds; console.log('Throughput received: %f msg/s --- %f Mbit/s', rate.toFixed(3), throuhputMbit.toFixed(3)); } await consumer.close(); await client.close(); })();