in src/perf_client/timing.h [309:539]
Results produce_results(
bool allow_pending,
size_t highest_local_commit,
size_t desired_rounds = 1)
{
TimeDelta end_time_delta = Clock::now() - start_time;
const auto rounds = min(max(sends.size(), 1ul), desired_rounds);
const auto round_size = sends.size() / rounds;
// Assume we receive responses in the same order requests were sent, then
// duplicate IDs shouldn't cause a problem
size_t next_recv = 0u;
using Latencies = vector<double>;
Results res;
Latencies all_local_commits;
Latencies all_global_commits;
// get test duration for last sent message's global commit
for (auto i = next_recv; i < receives.size(); ++i)
{
auto receive = receives[i];
if (receive.commit.has_value())
{
if (receive.global_seqno >= highest_local_commit)
{
LOG_INFO_FMT(
"Global commit match {} for highest local commit {}",
receive.global_seqno,
highest_local_commit);
auto was =
duration_cast<milliseconds>(end_time_delta).count() / 1000.0;
auto is =
duration_cast<milliseconds>(receive.receive_time).count() /
1000.0;
LOG_INFO_FMT("Duration changing from {}s to {}s", was, is);
end_time_delta = receive.receive_time;
break;
}
}
}
for (auto round = 1; round <= rounds; ++round)
{
const auto round_begin = sends.begin() + (round_size * (round - 1));
const auto round_end =
round == rounds ? sends.end() : round_begin + round_size;
Latencies round_local_commit;
Latencies round_global_commit;
struct PendingGlobalCommit
{
TimeDelta send_time;
size_t target_commit;
};
vector<PendingGlobalCommit> pending_global_commits;
auto complete_pending = [&](const ReceivedReply& receive) {
if (receive.global_seqno > 0)
{
auto pending_it = pending_global_commits.begin();
while (pending_it != pending_global_commits.end())
{
if (receive.global_seqno >= pending_it->target_commit)
{
round_global_commit.push_back(
(receive.receive_time - pending_it->send_time).count());
++pending_it;
}
else
{
// Assuming the target_commits within pending_global_commits are
// monotonic, we can break here. If this receive didn't satisfy
// the first pending commit, it can't satisfy any later
break;
}
}
if (pending_it != pending_global_commits.begin())
{
pending_global_commits.erase(
pending_global_commits.begin(), pending_it);
}
}
};
for (auto send_it = round_begin; send_it != round_end; ++send_it)
{
const auto& send = *send_it;
double tx_latency;
optional<ReceivedReply> matching_reply;
for (auto i = next_recv; i < receives.size(); ++i)
{
const auto& receive = receives[i];
complete_pending(receive);
if (receive.rpc_id == send.rpc_id)
{
tx_latency = (receive.receive_time - send.send_time).count();
if (tx_latency < 0)
{
LOG_FAIL_FMT(
"Calculated a negative latency ({}) for RPC {} - duplicate "
"ID causing mismatch?",
tx_latency,
receive.rpc_id);
continue;
}
matching_reply = receive;
next_recv = i + 1;
break;
}
}
if (send.expects_commit)
{
if (matching_reply.has_value())
{
// Successful write - measure local tx time AND try to find global
// commit time
round_local_commit.push_back(tx_latency);
if (matching_reply->global_seqno >= matching_reply->commit->seqno)
{
// Global commit already already
round_global_commit.push_back(tx_latency);
}
else
{
if (matching_reply->commit->seqno <= highest_local_commit)
{
// Store expected global commit to find later
pending_global_commits.push_back(
{send.send_time, matching_reply->commit->seqno});
}
else
{
LOG_DEBUG_FMT(
"Ignoring request with ID {} because it committed too late "
"({} > {})",
send.rpc_id,
matching_reply->commit->seqno,
highest_local_commit);
}
}
}
else
{
// Write failed - measure local tx time
round_local_commit.push_back(tx_latency);
}
}
else
{
// Read-only - measure local tx time
round_local_commit.push_back(tx_latency);
}
}
// After every tracked send has been processed, consider every remaining
// receive to satisfy outstanding pending global commits
for (auto i = next_recv; i < receives.size(); ++i)
{
if (pending_global_commits.empty())
{
break;
}
complete_pending(receives[i]);
}
all_local_commits.insert(
all_local_commits.end(),
round_local_commit.begin(),
round_local_commit.end());
all_global_commits.insert(
all_global_commits.end(),
round_global_commit.begin(),
round_global_commit.end());
if (rounds > 1)
{
res.per_round.push_back(
{round_begin->rpc_id,
(round_end - 1)->rpc_id,
measure(round_local_commit),
measure(round_global_commit)});
}
if (!allow_pending)
{
if (!pending_global_commits.empty())
{
const auto& first = pending_global_commits[0];
throw runtime_error(fmt::format(
"Still waiting for {} global commits. First expected is {} for "
"a transaction sent at {} (NB: Highest local commit is {})",
pending_global_commits.size(),
first.target_commit,
first.send_time.count(),
highest_local_commit));
}
}
const auto expected_local_samples = distance(round_begin, round_end);
const auto actual_local_samples = round_local_commit.size();
if (actual_local_samples != expected_local_samples)
{
throw runtime_error(fmt::format(
"Measured {} response times, yet sent {} requests",
actual_local_samples,
expected_local_samples));
}
}
res.total_sends = sends.size();
res.total_receives = receives.size();
res.start_time = start_time;
res.duration = end_time_delta;
res.total_local_commit = measure(all_local_commits);
res.total_global_commit = measure(all_global_commits);
return res;
}