e2e-examples/gcs/benchmark/runner_watcher.cc (70 lines of code) (raw):
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "runner_watcher.h"
RunnerWatcher::RunnerWatcher(size_t warmups, bool verbose)
: warmups_(warmups), verbose_(verbose) {}
absl::Time RunnerWatcher::GetStartTime() const { return start_time_; }
void RunnerWatcher::SetStartTime(absl::Time start_time) {
start_time_ = start_time;
}
absl::Duration RunnerWatcher::GetDuration() const { return duration_; }
void RunnerWatcher::SetDuration(absl::Duration duration) {
duration_ = duration;
}
void RunnerWatcher::NotifyCompleted(OperationType operationType,
int32_t runner_id, int64_t channel_id,
std::string peer, std::string bucket,
std::string object, grpc::Status status,
int64_t bytes, absl::Time time,
absl::Duration elapsed_time,
std::vector<Chunk> chunks) {
Operation op;
op.type = operationType;
op.runner_id = runner_id;
op.channel_id = channel_id;
op.peer = peer;
op.bucket = bucket;
op.object = object;
op.status = status;
op.bytes = bytes;
op.time = time;
op.elapsed_time = elapsed_time;
op.chunks = std::move(chunks);
// Insert records
size_t ord;
{
absl::MutexLock l(&lock_);
operations_.push_back(std::move(op));
ord = operations_.size();
}
if (verbose_) {
auto sec = absl::ToDoubleSeconds(op.elapsed_time);
printf(
"### %sCompleted: ord=%ld time=%s peer=%s bucket=%s object=%s "
"bytes=%lld elapsed=%.2fs%s\n",
ToOperationTypeString(operationType), ord,
absl::FormatTime(absl::RFC3339_sec, time, absl::UTCTimeZone()).c_str(),
peer.c_str(), bucket.c_str(), object.c_str(), (long long)bytes, sec,
ord <= warmups_ ? " [WARM-UP]" : "");
fflush(stdout);
}
}
std::vector<RunnerWatcher::Operation> RunnerWatcher::GetNonWarmupsOperations()
const {
absl::MutexLock l(&lock_);
if (warmups_ >= operations_.size()) {
return {};
}
return std::vector<RunnerWatcher::Operation>(operations_.begin() + warmups_,
operations_.end());
}
absl::Duration RunnerWatcher::GetNonWarmupsDuration() const {
auto operations = GetNonWarmupsOperations();
if (operations.empty()) {
return absl::ZeroDuration();
}
absl::Time begin = operations[0].time;
absl::Time end = operations[0].time + operations[0].elapsed_time;
for (auto op : operations) {
begin = std::min(begin, op.time);
end = std::max(begin, op.time) + op.elapsed_time;
}
return end - begin;
}