public boolean execute()

in pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java [139:273]


  public boolean execute()
      throws Exception {
    if (!new File(_queryFile).isFile()) {
      LOGGER.error("Argument queryFile: {} is not a valid file.", _queryFile);
      printUsage();
      return false;
    }
    if (_numTimesToRunQueries < 0) {
      LOGGER.error("Argument numTimesToRunQueries should be a non-negative number.");
      printUsage();
      return false;
    }
    if (_reportIntervalMs <= 0) {
      LOGGER.error("Argument reportIntervalMs should be a positive number.");
      printUsage();
      return false;
    }
    if (_numIntervalsToReportAndClearStatistics < 0) {
      LOGGER.error("Argument numIntervalsToReportAndClearStatistics should be a non-negative number.");
      printUsage();
      return false;
    }
    if (_queueDepth <= 0) {
      LOGGER.error("Argument queueDepth should be a positive number.");
      printUsage();
      return false;
    }

    LOGGER.info("Start query runner targeting broker: {}:{}", _brokerHost, _brokerPort);
    PerfBenchmarkDriverConf conf = new PerfBenchmarkDriverConf();
    conf.setBrokerHost(_brokerHost);
    conf.setBrokerPort(_brokerPort);
    conf.setBrokerURL(_brokerURL);
    conf.setRunQueries(true);
    conf.setStartZookeeper(false);
    conf.setStartController(false);
    conf.setStartBroker(false);
    conf.setStartServer(false);
    conf.setVerbose(_verbose);
    conf.setUser(_user);
    conf.setPassword(_password);
    conf.setAuthToken(_authToken);
    conf.setAuthTokenUrl(_authTokenUrl);

    List<String> queries =
        makeQueries(IOUtils.readLines(new FileInputStream(_queryFile)), QueryMode.valueOf(_queryMode.toUpperCase()),
            _queryCount);

    switch (_mode) {
      case "singleThread":
        LOGGER.info("MODE singleThread with queryFile: {}, numTimesToRunQueries: {}, reportIntervalMs: {}, "
                + "numIntervalsToReportAndClearStatistics: {}, timeout: {}", _queryFile, _numTimesToRunQueries,
            _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _timeout);
        singleThreadedQueryRunner(conf, queries, _numTimesToRunQueries, _reportIntervalMs,
            _numIntervalsToReportAndClearStatistics, _timeout,
            AuthProviderUtils.makeAuthHeadersMap(AuthProviderUtils.makeAuthProvider(null,
                _authTokenUrl, _authToken, _user, _password)), _enablePerQueryStats);
        break;
      case "multiThreads":
        if (_numThreads <= 0) {
          LOGGER.error("For multiThreads mode, argument numThreads should be a positive number.");
          printUsage();
          break;
        }
        LOGGER.info("MODE multiThreads with queryFile: {}, numTimesToRunQueries: {}, numThreads: {}, "
                + "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, queueDepth: {}, timeout: {}",
            _queryFile, _numTimesToRunQueries, _numThreads, _reportIntervalMs, _numIntervalsToReportAndClearStatistics,
            _queueDepth, _timeout);
        multiThreadedQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _reportIntervalMs,
            _numIntervalsToReportAndClearStatistics, _timeout,
            AuthProviderUtils.makeAuthHeadersMap(AuthProviderUtils.makeAuthProvider(null,
                _authTokenUrl, _authToken, _user, _password)), _enablePerQueryStats);
        break;
      case "targetQPS":
        if (_numThreads <= 0) {
          LOGGER.error("For targetQPS mode, argument numThreads should be a positive number.");
          printUsage();
          break;
        }
        if (_startQPS <= 0 || _startQPS > 1000000.0) {
          LOGGER.error(
              "For targetQPS mode, argument startQPS should be a positive number that less or equal to 1000000.");
          printUsage();
          break;
        }
        LOGGER.info("MODE targetQPS with queryFile: {}, numTimesToRunQueries: {}, numThreads: {}, startQPS: {}, "
                + "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, queueDepth: {}, timeout: {}",
            _queryFile, _numTimesToRunQueries, _numThreads, _startQPS, _reportIntervalMs,
            _numIntervalsToReportAndClearStatistics, _queueDepth, _timeout);
        targetQPSQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _startQPS,
            _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _timeout,
            AuthProviderUtils.makeAuthHeadersMap(
                AuthProviderUtils.makeAuthProvider(null, _authTokenUrl, _authToken, _user, _password)),
            _enablePerQueryStats);
        break;
      case "increasingQPS":
        if (_numThreads <= 0) {
          LOGGER.error("For increasingQPS mode, argument numThreads should be a positive number.");
          printUsage();
          break;
        }
        if (_startQPS <= 0 || _startQPS > 1000000.0) {
          LOGGER.error(
              "For increasingQPS mode, argument startQPS should be a positive number that less or equal to 1000000.");
          printUsage();
          break;
        }
        if (_deltaQPS <= 0) {
          LOGGER.error("For increasingQPS mode, argument deltaQPS should be a positive number.");
          printUsage();
          break;
        }
        if (_numIntervalsToIncreaseQPS <= 0) {
          LOGGER.error("For increasingQPS mode, argument numIntervalsToIncreaseQPS should be a positive number.");
          printUsage();
          break;
        }
        LOGGER.info("MODE increasingQPS with queryFile: {}, numTimesToRunQueries: {}, numThreads: {}, startQPS: {}, "
                + "deltaQPS: {}, reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, "
                + "numIntervalsToIncreaseQPS: {}, queueDepth: {}, timeout: {}", _queryFile, _numTimesToRunQueries,
            _numThreads, _startQPS, _deltaQPS, _reportIntervalMs, _numIntervalsToReportAndClearStatistics,
            _numIntervalsToIncreaseQPS, _queueDepth, _timeout);
        increasingQPSQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _startQPS, _deltaQPS,
            _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _numIntervalsToIncreaseQPS, _timeout,
            AuthProviderUtils.makeAuthHeadersMap(
                AuthProviderUtils.makeAuthProvider(null, _authTokenUrl, _authToken, _user, _password)),
            _enablePerQueryStats);
        break;
      default:
        LOGGER.error("Invalid mode: {}", _mode);
        printUsage();
        break;
    }
    return true;
  }