in horovod/common/operations.cc [253:330]
void PerformOperation(Response response, HorovodGlobalState& state) {
std::vector<TensorTableEntry> entries;
auto& timeline = horovod_global.timeline;
if (response.response_type() != Response::JOIN) {
horovod_global.tensor_queue.GetTensorEntriesFromResponse(response, entries,
state.joined);
for (auto& e : entries) {
timeline.Start(e.tensor_name, response.response_type());
}
if (entries.size() > 1) {
auto first_entry = entries[0];
// Note: it is OK for different entries to come from different frameworks
// since buffer allocated here is guaranteed to survive at least till the
// end of this operation.
Status status = horovod_global.fusion_buffer.InitializeBuffer(
horovod_global.controller->TensorFusionThresholdBytes(),
first_entry.device, first_entry.context,
horovod_global.current_nccl_stream,
[&]() { timeline.ActivityStartAll(entries, INIT_FUSION_BUFFER); },
[&]() { timeline.ActivityEndAll(entries); });
if (!status.ok()) {
for (auto& e : entries) {
timeline.End(e.tensor_name, nullptr);
// Callback can be null if the rank sent Join request.
if (e.callback != nullptr) {
e.callback(status);
}
}
return;
}
}
// On GPU data readiness is signalled by ready_event.
std::vector<TensorTableEntry> waiting_tensors;
for (auto& e : entries) {
if (e.ready_event != nullptr) {
timeline.ActivityStart(e.tensor_name, WAIT_FOR_DATA);
waiting_tensors.push_back(e);
}
}
while (!waiting_tensors.empty()) {
for (auto it = waiting_tensors.begin(); it != waiting_tensors.end();) {
if (it->ready_event->Ready()) {
timeline.ActivityEnd(it->tensor_name);
timeline.ActivityStart(it->tensor_name, WAIT_FOR_OTHER_TENSOR_DATA);
it = waiting_tensors.erase(it);
} else {
++it;
}
}
std::this_thread::sleep_for(std::chrono::nanoseconds(100));
}
for (auto& e : entries) {
if (e.ready_event != nullptr) {
timeline.ActivityEnd(e.tensor_name);
}
}
}
Status status;
try {
status = op_manager->ExecuteOperation(entries, response);
} catch (const std::exception& ex) {
status = Status::UnknownError(ex.what());
}
if (!status.in_progress()) {
for (auto& e : entries) {
timeline.End(e.tensor_name, status.ok() ? e.output : nullptr);
// Callback can be null if the rank sent Join request.
if (e.callback != nullptr) {
e.callback(status);
}
}
}
}