Results produce_results()

in src/perf_client/timing.h [309:539]


    Results produce_results(
      bool allow_pending,
      size_t highest_local_commit,
      size_t desired_rounds = 1)
    {
      TimeDelta end_time_delta = Clock::now() - start_time;

      const auto rounds = min(max(sends.size(), 1ul), desired_rounds);
      const auto round_size = sends.size() / rounds;

      // Assume we receive responses in the same order requests were sent, then
      // duplicate IDs shouldn't cause a problem
      size_t next_recv = 0u;

      using Latencies = vector<double>;

      Results res;
      Latencies all_local_commits;
      Latencies all_global_commits;

      // get test duration for last sent message's global commit
      for (auto i = next_recv; i < receives.size(); ++i)
      {
        auto receive = receives[i];

        if (receive.commit.has_value())
        {
          if (receive.global_seqno >= highest_local_commit)
          {
            LOG_INFO_FMT(
              "Global commit match {} for highest local commit {}",
              receive.global_seqno,
              highest_local_commit);
            auto was =
              duration_cast<milliseconds>(end_time_delta).count() / 1000.0;
            auto is =
              duration_cast<milliseconds>(receive.receive_time).count() /
              1000.0;
            LOG_INFO_FMT("Duration changing from {}s to {}s", was, is);
            end_time_delta = receive.receive_time;
            break;
          }
        }
      }

      for (auto round = 1; round <= rounds; ++round)
      {
        const auto round_begin = sends.begin() + (round_size * (round - 1));
        const auto round_end =
          round == rounds ? sends.end() : round_begin + round_size;

        Latencies round_local_commit;
        Latencies round_global_commit;

        struct PendingGlobalCommit
        {
          TimeDelta send_time;
          size_t target_commit;
        };
        vector<PendingGlobalCommit> pending_global_commits;

        auto complete_pending = [&](const ReceivedReply& receive) {
          if (receive.global_seqno > 0)
          {
            auto pending_it = pending_global_commits.begin();
            while (pending_it != pending_global_commits.end())
            {
              if (receive.global_seqno >= pending_it->target_commit)
              {
                round_global_commit.push_back(
                  (receive.receive_time - pending_it->send_time).count());
                ++pending_it;
              }
              else
              {
                // Assuming the target_commits within pending_global_commits are
                // monotonic, we can break here. If this receive didn't satisfy
                // the first pending commit, it can't satisfy any later
                break;
              }
            }
            if (pending_it != pending_global_commits.begin())
            {
              pending_global_commits.erase(
                pending_global_commits.begin(), pending_it);
            }
          }
        };

        for (auto send_it = round_begin; send_it != round_end; ++send_it)
        {
          const auto& send = *send_it;

          double tx_latency;
          optional<ReceivedReply> matching_reply;
          for (auto i = next_recv; i < receives.size(); ++i)
          {
            const auto& receive = receives[i];

            complete_pending(receive);

            if (receive.rpc_id == send.rpc_id)
            {
              tx_latency = (receive.receive_time - send.send_time).count();

              if (tx_latency < 0)
              {
                LOG_FAIL_FMT(
                  "Calculated a negative latency ({}) for RPC {} - duplicate "
                  "ID causing mismatch?",
                  tx_latency,
                  receive.rpc_id);
                continue;
              }

              matching_reply = receive;
              next_recv = i + 1;
              break;
            }
          }

          if (send.expects_commit)
          {
            if (matching_reply.has_value())
            {
              // Successful write - measure local tx time AND try to find global
              // commit time
              round_local_commit.push_back(tx_latency);

              if (matching_reply->global_seqno >= matching_reply->commit->seqno)
              {
                // Global commit already already
                round_global_commit.push_back(tx_latency);
              }
              else
              {
                if (matching_reply->commit->seqno <= highest_local_commit)
                {
                  // Store expected global commit to find later
                  pending_global_commits.push_back(
                    {send.send_time, matching_reply->commit->seqno});
                }
                else
                {
                  LOG_DEBUG_FMT(
                    "Ignoring request with ID {} because it committed too late "
                    "({} > {})",
                    send.rpc_id,
                    matching_reply->commit->seqno,
                    highest_local_commit);
                }
              }
            }
            else
            {
              // Write failed - measure local tx time
              round_local_commit.push_back(tx_latency);
            }
          }
          else
          {
            // Read-only - measure local tx time
            round_local_commit.push_back(tx_latency);
          }
        }

        // After every tracked send has been processed, consider every remaining
        // receive to satisfy outstanding pending global commits
        for (auto i = next_recv; i < receives.size(); ++i)
        {
          if (pending_global_commits.empty())
          {
            break;
          }

          complete_pending(receives[i]);
        }

        all_local_commits.insert(
          all_local_commits.end(),
          round_local_commit.begin(),
          round_local_commit.end());
        all_global_commits.insert(
          all_global_commits.end(),
          round_global_commit.begin(),
          round_global_commit.end());

        if (rounds > 1)
        {
          res.per_round.push_back(
            {round_begin->rpc_id,
             (round_end - 1)->rpc_id,
             measure(round_local_commit),
             measure(round_global_commit)});
        }

        if (!allow_pending)
        {
          if (!pending_global_commits.empty())
          {
            const auto& first = pending_global_commits[0];
            throw runtime_error(fmt::format(
              "Still waiting for {} global commits. First expected is {} for "
              "a transaction sent at {} (NB: Highest local commit is {})",
              pending_global_commits.size(),
              first.target_commit,
              first.send_time.count(),
              highest_local_commit));
          }
        }

        const auto expected_local_samples = distance(round_begin, round_end);
        const auto actual_local_samples = round_local_commit.size();
        if (actual_local_samples != expected_local_samples)
        {
          throw runtime_error(fmt::format(
            "Measured {} response times, yet sent {} requests",
            actual_local_samples,
            expected_local_samples));
        }
      }

      res.total_sends = sends.size();
      res.total_receives = receives.size();
      res.start_time = start_time;
      res.duration = end_time_delta;

      res.total_local_commit = measure(all_local_commits);
      res.total_global_commit = measure(all_global_commits);
      return res;
    }