horovod/common/timeline.cc (233 lines of code) (raw):
// Copyright 2019 Uber Technologies, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================
#include "timeline.h"
#include <cassert>
#include <chrono>
#include <sstream>
#include <thread>
#include "logging.h"
namespace horovod {
namespace common {
void TimelineWriter::Initialize(std::string file_name) {
file_.open(file_name, std::ios::out | std::ios::trunc);
if (file_.good()) {
// Initialize the timeline with '[' character.
file_ << "[\n";
healthy_ = true;
// Spawn writer thread.
std::thread writer_thread(&TimelineWriter::WriterLoop, this);
writer_thread.detach();
} else {
LOG(ERROR) << "Error opening the Horovod Timeline file " << file_name
<< ", will not write a timeline.";
}
}
void TimelineWriter::EnqueueWriteEvent(const std::string& tensor_name,
char phase, const std::string& op_name,
const std::string& args,
long ts_micros) {
TimelineRecord r{};
r.type = TimelineRecordType::EVENT;
r.tensor_name = tensor_name;
r.phase = phase;
r.op_name = op_name;
r.args = args;
r.ts_micros = ts_micros;
while (healthy_ && !record_queue_.push(r))
;
}
void TimelineWriter::EnqueueWriteMarker(const std::string& name,
long ts_micros) {
TimelineRecord r{};
r.type = TimelineRecordType::MARKER;
r.marker_name = name;
r.ts_micros = ts_micros;
while (healthy_ && !record_queue_.push(r))
;
}
void TimelineWriter::DoWriteEvent(const TimelineRecord& r) {
assert(r.type == TimelineRecordType::EVENT);
auto& tensor_idx = tensor_table_[r.tensor_name];
if (tensor_idx == 0) {
tensor_idx = (int)tensor_table_.size();
// We model tensors as processes. Register metadata for this "pid".
file_ << "{";
file_ << "\"name\": \"process_name\"";
file_ << ", \"ph\": \"M\"";
file_ << ", \"pid\": " << tensor_idx << "";
file_ << ", \"args\": {\"name\": \"" << r.tensor_name << "\"}";
file_ << "}," << std::endl;
file_ << "{";
file_ << "\"name\": \"process_sort_index\"";
file_ << ", \"ph\": \"M\"";
file_ << ", \"pid\": " << tensor_idx << "";
file_ << ", \"args\": {\"sort_index\": " << tensor_idx << "}";
file_ << "}," << std::endl;
}
file_ << "{";
file_ << "\"ph\": \"" << r.phase << "\"";
if (r.phase != 'E') {
// Not necessary for ending event.
file_ << ", \"name\": \"" << r.op_name << "\"";
}
file_ << ", \"ts\": " << r.ts_micros << "";
file_ << ", \"pid\": " << tensor_idx << "";
if (r.phase == 'X') {
file_ << ", \"dur\": " << 0 << "";
}
if (r.args != "") {
file_ << ", \"args\": {" << r.args << "}";
}
file_ << "}," << std::endl;
}
void TimelineWriter::DoWriteMarker(const TimelineRecord& r) {
assert(r.type == TimelineRecordType::MARKER);
file_ << "{";
file_ << "\"ph\": \"i\"";
file_ << ", \"name\": \"" << r.marker_name << "\"";
file_ << ", \"ts\": " << r.ts_micros << "";
file_ << ", \"s\": \"g\"";
file_ << "}," << std::endl;
}
void TimelineWriter::WriterLoop() {
while (healthy_) {
while (healthy_ && !record_queue_.empty()) {
auto& r = record_queue_.front();
switch (r.type) {
case TimelineRecordType::EVENT:
DoWriteEvent(r);
break;
case TimelineRecordType::MARKER:
DoWriteMarker(r);
break;
default:
throw std::logic_error("Unknown event type provided.");
}
record_queue_.pop();
if (!file_.good()) {
LOG(ERROR) << "Error writing to the Horovod Timeline after it was "
"successfully opened, will stop writing the timeline.";
healthy_ = false;
}
}
// Allow scheduler to schedule other work for this core.
std::this_thread::yield();
}
}
void Timeline::Initialize(std::string file_name, unsigned int horovod_size) {
if (initialized_) {
return;
}
// Start the writer.
writer_.Initialize(std::move(file_name));
// Initialize if we were able to open the file successfully.
initialized_ = writer_.IsHealthy();
// Pre-initialize the string representation for each rank.
rank_strings_ = std::vector<std::string>(horovod_size);
for (unsigned int i = 0; i < horovod_size; i++) {
rank_strings_[i] = std::to_string(i);
}
}
long Timeline::TimeSinceStartMicros() const {
auto now = std::chrono::steady_clock::now();
auto ts = now - start_time_;
return std::chrono::duration_cast<std::chrono::microseconds>(ts).count();
}
// Write event to the Horovod Timeline file.
void Timeline::WriteEvent(const std::string& tensor_name, const char phase,
const std::string& op_name, const std::string& args) {
auto ts_micros = TimeSinceStartMicros();
writer_.EnqueueWriteEvent(tensor_name, phase, op_name, args, ts_micros);
}
void Timeline::WriteMarker(const std::string& name) {
auto ts_micros = TimeSinceStartMicros();
writer_.EnqueueWriteMarker(name, ts_micros);
}
void Timeline::NegotiateStart(const std::string& tensor_name,
const Request::RequestType request_type) {
if (!initialized_) {
return;
}
std::lock_guard<std::recursive_mutex> guard(mutex_);
// Note: Need to enable repeated calls to this routine during negotiate
// phase. Repeated calls can occur if a cached response initiates the
// negotiation phase, either due to multiple cycles with cache misses on
// some worker, or if the response is evicted from the cache before
// completion and its handling proceeds to the default communication path.
// First call takes precedence.
if (tensor_states_[tensor_name] == TimelineState::NEGOTIATING) {
return;
}
assert(tensor_states_[tensor_name] == TimelineState::UNKNOWN);
auto event_category =
"NEGOTIATE_" + Request::RequestType_Name(request_type);
WriteEvent(tensor_name, 'B', event_category);
tensor_states_[tensor_name] = TimelineState::NEGOTIATING;
}
void Timeline::NegotiateRankReady(const std::string& tensor_name,
const int rank) {
if (!initialized_) {
return;
}
std::lock_guard<std::recursive_mutex> guard(mutex_);
assert(tensor_states_[tensor_name] == TimelineState::NEGOTIATING);
WriteEvent(tensor_name, 'X', rank_strings_[rank]);
}
void Timeline::NegotiateEnd(const std::string& tensor_name) {
if (!initialized_) {
return;
}
std::lock_guard<std::recursive_mutex> guard(mutex_);
assert(tensor_states_[tensor_name] == TimelineState::NEGOTIATING);
WriteEvent(tensor_name, 'E');
tensor_states_.erase(tensor_name);
}
void Timeline::Start(const std::string& tensor_name,
const Response::ResponseType response_type) {
if (!initialized_) {
return;
}
std::lock_guard<std::recursive_mutex> guard(mutex_);
assert(tensor_states_[tensor_name] == TimelineState::UNKNOWN);
auto event_category = Response::ResponseType_Name(response_type);
WriteEvent(tensor_name, 'B', event_category);
tensor_states_[tensor_name] = TimelineState::TOP_LEVEL;
}
void Timeline::ActivityStartAll(const std::vector<TensorTableEntry>& entries,
const std::string& activity) {
for (auto& e : entries) {
ActivityStart(e.tensor_name, activity);
}
}
void Timeline::ActivityStart(const std::string& tensor_name,
const std::string& activity) {
if (!initialized_) {
return;
}
std::lock_guard<std::recursive_mutex> guard(mutex_);
assert(tensor_states_[tensor_name] == TimelineState::TOP_LEVEL);
WriteEvent(tensor_name, 'B', activity);
tensor_states_[tensor_name] = TimelineState::ACTIVITY;
}
void Timeline::ActivityEndAll(const std::vector<TensorTableEntry>& entries) {
for (auto& e : entries) {
ActivityEnd(e.tensor_name);
}
}
void Timeline::ActivityEnd(const std::string& tensor_name) {
if (!initialized_) {
return;
}
std::lock_guard<std::recursive_mutex> guard(mutex_);
assert(tensor_states_[tensor_name] == TimelineState::ACTIVITY);
WriteEvent(tensor_name, 'E');
tensor_states_[tensor_name] = TimelineState::TOP_LEVEL;
}
void Timeline::End(const std::string& tensor_name,
const std::shared_ptr<Tensor> tensor) {
if (!initialized_) {
return;
}
std::lock_guard<std::recursive_mutex> guard(mutex_);
// Pop out of current state, if applicable.
if (tensor_states_[tensor_name] == TimelineState::ACTIVITY) {
ActivityEnd(tensor_name);
}
std::stringstream args;
if (tensor != nullptr) {
args << "\"dtype\": \"" << DataType_Name(tensor->dtype()) << "\"";
args << ", \"shape\": \"" << tensor->shape().DebugString() << "\"";
}
WriteEvent(tensor_name, 'E', "", args.str());
}
void Timeline::MarkCycleStart() {
if (!initialized_) {
return;
}
std::lock_guard<std::recursive_mutex> guard(mutex_);
WriteMarker("CYCLE_START");
}
} // namespace common
} // namespace horovod