int fizzClientLoadGenCommand()

in fizz/tool/FizzClientLoadGenCommand.cpp [171:305]


int fizzClientLoadGenCommand(const std::vector<std::string>& args) {
  // configurable parameters
  struct ClientLoadgenConfig config;
  bool enableBatch = false;

  // Argument Handler Map
  // clang-format off
  FizzArgHandlerMap handlers = {
    {"-connect", {true, [&config](const std::string& arg) {
        std::tie(config.host, config.port) = hostPortFromString(arg);
     }}},
    {"-threads", {true, [&config](const std::string& arg) {
      config.threadNum = std::stoi(arg);
    }}},
    {"-tasks", {true, [&config](const std::string& arg) {
      config.numTaskPerSecond = std::stoi(arg);
    }}},
    {"-time", {true, [&config](const std::string& arg) {
      config.totalTime = std::stoi(arg);
    }}},
    {"-cafile", {true, [&config](const std::string& arg) {
      config.caFile = arg;
    }}},
    {"-0rtt", {true, [&config](const std::string& arg) {
      config.pskLoadFile = arg;
    }}},
    {"-json", {false, [&config](const std::string&) {
      config.jsonOutput = true;
    }}},
    {"-p", {true, [&config](const std::string& arg) {
      config.percentiles.emplace_back((float)std::stoi(arg) / 100);
    }}},
    {"-min", {true, [&config](const std::string& arg) {
      config.minLatency = std::stoi(arg);
    }}},
    {"-max", {true, [&config](const std::string& arg) {
      config.maxLatency = std::stoi(arg);
    }}},
    {"-batch", {false, [&enableBatch](const std::string&) {
      enableBatch = true;
    }}}
  };
  // clang-format on

  // parse arguments
  try {
    if (parseArguments(args, handlers, printUsage)) {
      return 1;
    }
  } catch (const std::exception& e) {
    LOG(ERROR) << "Error: " << e.what();
    return 1;
  }
  if (config.numTaskPerSecond <= 0) {
    config.numTaskPerSecond = config.threadNum;
  }

  // set up the IO Thread Pool and get the EventBase
  auto threadExe = std::make_shared<IOThreadPoolExecutor>(
      config.threadNum,
      std::make_shared<NamedThreadFactory>("LoadGenClientPool"),
      folly::EventBaseManager::get(),
      IOThreadPoolExecutor::Options().setWaitForAll(true));

  // Prepare FizzClientContext
  auto clientContext = std::make_shared<FizzClientContext>();
  clientContext->setSupportedCiphers({CipherSuite::TLS_AES_128_GCM_SHA256});
  clientContext->setSupportedVersions(
      {ProtocolVersion::tls_1_3, ProtocolVersion::tls_1_3_28});
  clientContext->setSupportedSigSchemes(
      {SignatureScheme::rsa_pss_sha256,
       SignatureScheme::ecdsa_secp256r1_sha256,
       SignatureScheme::ecdsa_secp384r1_sha384});
  if (enableBatch) {
    clientContext->setFactory(BatchSignatureFactory::makeBatchSignatureFactory(
        clientContext->getFactoryPtr()));
    clientContext->setSupportedSigSchemes(
        {SignatureScheme::rsa_pss_sha256_batch,
         SignatureScheme::ecdsa_secp256r1_sha256_batch});
  }

  std::shared_ptr<const CertificateVerifier> verifier;

  if (!config.caFile.empty()) {
    // Initialize CA store and the verifier for server certificate verification
    folly::ssl::X509StoreUniquePtr storePtr;
    storePtr.reset(X509_STORE_new());
    if (X509_STORE_load_locations(
            storePtr.get(), config.caFile.c_str(), nullptr) == 0) {
      VLOG(1) << "Failed to load CA certificates";
      return 1;
    }
    verifier = std::make_shared<const DefaultCertificateVerifier>(
        VerificationContext::Client, std::move(storePtr));
  }

  // Start creating clients and connecting
  std::vector<std::chrono::microseconds> stats;
  stats.resize(
      config.numTaskPerSecond * config.totalTime,
      std::chrono::microseconds::zero());
  auto it = stats.begin();
  auto periodMS = 1000 / config.numTaskPerSecond;
  SocketAddress addr(config.host, config.port, true);
  for (size_t i = 0; i < stats.size(); i++) {
    via(threadExe->weakRef()).thenValue([=](auto&&) {
      auto evb = folly::EventBaseManager::get()->getEventBase();
      auto task = new ClientTask(evb, clientContext, verifier, &(*it));
      task->start(addr);
    });
    it++;
    std::this_thread::sleep_for(std::chrono::milliseconds(periodMS));
  }

  // Wait for assignments finished
  threadExe->join();

  // generate statistics
  if (config.jsonOutput) {
    const int buckWidth = 1000; // 1ms as the bucket width
    Histogram<uint32_t> histo(buckWidth, config.minLatency, config.maxLatency);
    uint32_t totalLatency = 0;
    size_t numSuccess = 0;
    for (const auto& val : stats) {
      if (val != std::chrono::microseconds::zero()) {
        numSuccess++;
        histo.addValue(val.count());
        totalLatency += val.count();
      }
    }
    std::cout << getJsonStr(totalLatency, histo, numSuccess, config)
              << std::endl;
  }
  return 0;
}