in pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java [139:273]
public boolean execute()
throws Exception {
if (!new File(_queryFile).isFile()) {
LOGGER.error("Argument queryFile: {} is not a valid file.", _queryFile);
printUsage();
return false;
}
if (_numTimesToRunQueries < 0) {
LOGGER.error("Argument numTimesToRunQueries should be a non-negative number.");
printUsage();
return false;
}
if (_reportIntervalMs <= 0) {
LOGGER.error("Argument reportIntervalMs should be a positive number.");
printUsage();
return false;
}
if (_numIntervalsToReportAndClearStatistics < 0) {
LOGGER.error("Argument numIntervalsToReportAndClearStatistics should be a non-negative number.");
printUsage();
return false;
}
if (_queueDepth <= 0) {
LOGGER.error("Argument queueDepth should be a positive number.");
printUsage();
return false;
}
LOGGER.info("Start query runner targeting broker: {}:{}", _brokerHost, _brokerPort);
PerfBenchmarkDriverConf conf = new PerfBenchmarkDriverConf();
conf.setBrokerHost(_brokerHost);
conf.setBrokerPort(_brokerPort);
conf.setBrokerURL(_brokerURL);
conf.setRunQueries(true);
conf.setStartZookeeper(false);
conf.setStartController(false);
conf.setStartBroker(false);
conf.setStartServer(false);
conf.setVerbose(_verbose);
conf.setUser(_user);
conf.setPassword(_password);
conf.setAuthToken(_authToken);
conf.setAuthTokenUrl(_authTokenUrl);
List<String> queries =
makeQueries(IOUtils.readLines(new FileInputStream(_queryFile)), QueryMode.valueOf(_queryMode.toUpperCase()),
_queryCount);
switch (_mode) {
case "singleThread":
LOGGER.info("MODE singleThread with queryFile: {}, numTimesToRunQueries: {}, reportIntervalMs: {}, "
+ "numIntervalsToReportAndClearStatistics: {}, timeout: {}", _queryFile, _numTimesToRunQueries,
_reportIntervalMs, _numIntervalsToReportAndClearStatistics, _timeout);
singleThreadedQueryRunner(conf, queries, _numTimesToRunQueries, _reportIntervalMs,
_numIntervalsToReportAndClearStatistics, _timeout,
AuthProviderUtils.makeAuthHeadersMap(AuthProviderUtils.makeAuthProvider(null,
_authTokenUrl, _authToken, _user, _password)), _enablePerQueryStats);
break;
case "multiThreads":
if (_numThreads <= 0) {
LOGGER.error("For multiThreads mode, argument numThreads should be a positive number.");
printUsage();
break;
}
LOGGER.info("MODE multiThreads with queryFile: {}, numTimesToRunQueries: {}, numThreads: {}, "
+ "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, queueDepth: {}, timeout: {}",
_queryFile, _numTimesToRunQueries, _numThreads, _reportIntervalMs, _numIntervalsToReportAndClearStatistics,
_queueDepth, _timeout);
multiThreadedQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _reportIntervalMs,
_numIntervalsToReportAndClearStatistics, _timeout,
AuthProviderUtils.makeAuthHeadersMap(AuthProviderUtils.makeAuthProvider(null,
_authTokenUrl, _authToken, _user, _password)), _enablePerQueryStats);
break;
case "targetQPS":
if (_numThreads <= 0) {
LOGGER.error("For targetQPS mode, argument numThreads should be a positive number.");
printUsage();
break;
}
if (_startQPS <= 0 || _startQPS > 1000000.0) {
LOGGER.error(
"For targetQPS mode, argument startQPS should be a positive number that less or equal to 1000000.");
printUsage();
break;
}
LOGGER.info("MODE targetQPS with queryFile: {}, numTimesToRunQueries: {}, numThreads: {}, startQPS: {}, "
+ "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, queueDepth: {}, timeout: {}",
_queryFile, _numTimesToRunQueries, _numThreads, _startQPS, _reportIntervalMs,
_numIntervalsToReportAndClearStatistics, _queueDepth, _timeout);
targetQPSQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _startQPS,
_reportIntervalMs, _numIntervalsToReportAndClearStatistics, _timeout,
AuthProviderUtils.makeAuthHeadersMap(
AuthProviderUtils.makeAuthProvider(null, _authTokenUrl, _authToken, _user, _password)),
_enablePerQueryStats);
break;
case "increasingQPS":
if (_numThreads <= 0) {
LOGGER.error("For increasingQPS mode, argument numThreads should be a positive number.");
printUsage();
break;
}
if (_startQPS <= 0 || _startQPS > 1000000.0) {
LOGGER.error(
"For increasingQPS mode, argument startQPS should be a positive number that less or equal to 1000000.");
printUsage();
break;
}
if (_deltaQPS <= 0) {
LOGGER.error("For increasingQPS mode, argument deltaQPS should be a positive number.");
printUsage();
break;
}
if (_numIntervalsToIncreaseQPS <= 0) {
LOGGER.error("For increasingQPS mode, argument numIntervalsToIncreaseQPS should be a positive number.");
printUsage();
break;
}
LOGGER.info("MODE increasingQPS with queryFile: {}, numTimesToRunQueries: {}, numThreads: {}, startQPS: {}, "
+ "deltaQPS: {}, reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, "
+ "numIntervalsToIncreaseQPS: {}, queueDepth: {}, timeout: {}", _queryFile, _numTimesToRunQueries,
_numThreads, _startQPS, _deltaQPS, _reportIntervalMs, _numIntervalsToReportAndClearStatistics,
_numIntervalsToIncreaseQPS, _queueDepth, _timeout);
increasingQPSQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _startQPS, _deltaQPS,
_reportIntervalMs, _numIntervalsToReportAndClearStatistics, _numIntervalsToIncreaseQPS, _timeout,
AuthProviderUtils.makeAuthHeadersMap(
AuthProviderUtils.makeAuthProvider(null, _authTokenUrl, _authToken, _user, _password)),
_enablePerQueryStats);
break;
default:
LOGGER.error("Invalid mode: {}", _mode);
printUsage();
break;
}
return true;
}