void PerformOperation()

in horovod/common/operations.cc [253:330]


void PerformOperation(Response response, HorovodGlobalState& state) {
  std::vector<TensorTableEntry> entries;
  auto& timeline = horovod_global.timeline;
  if (response.response_type() != Response::JOIN) {
    horovod_global.tensor_queue.GetTensorEntriesFromResponse(response, entries,
                                                             state.joined);

    for (auto& e : entries) {
      timeline.Start(e.tensor_name, response.response_type());
    }

    if (entries.size() > 1) {
      auto first_entry = entries[0];
      // Note: it is OK for different entries to come from different frameworks
      // since buffer allocated here is guaranteed to survive at least till the
      // end of this operation.
      Status status = horovod_global.fusion_buffer.InitializeBuffer(
          horovod_global.controller->TensorFusionThresholdBytes(),
          first_entry.device, first_entry.context,
          horovod_global.current_nccl_stream,
          [&]() { timeline.ActivityStartAll(entries, INIT_FUSION_BUFFER); },
          [&]() { timeline.ActivityEndAll(entries); });
      if (!status.ok()) {
        for (auto& e : entries) {
          timeline.End(e.tensor_name, nullptr);
          // Callback can be null if the rank sent Join request.
          if (e.callback != nullptr) {
            e.callback(status);
          }
        }
        return;
      }
    }

    // On GPU data readiness is signalled by ready_event.
    std::vector<TensorTableEntry> waiting_tensors;
    for (auto& e : entries) {
      if (e.ready_event != nullptr) {
        timeline.ActivityStart(e.tensor_name, WAIT_FOR_DATA);
        waiting_tensors.push_back(e);
      }
    }
    while (!waiting_tensors.empty()) {
      for (auto it = waiting_tensors.begin(); it != waiting_tensors.end();) {
        if (it->ready_event->Ready()) {
          timeline.ActivityEnd(it->tensor_name);
          timeline.ActivityStart(it->tensor_name, WAIT_FOR_OTHER_TENSOR_DATA);
          it = waiting_tensors.erase(it);
        } else {
          ++it;
        }
      }
      std::this_thread::sleep_for(std::chrono::nanoseconds(100));
    }
    for (auto& e : entries) {
      if (e.ready_event != nullptr) {
        timeline.ActivityEnd(e.tensor_name);
      }
    }
  }

  Status status;
  try {
    status = op_manager->ExecuteOperation(entries, response);
  } catch (const std::exception& ex) {
    status = Status::UnknownError(ex.what());
  }

  if (!status.in_progress()) {
    for (auto& e : entries) {
      timeline.End(e.tensor_name, status.ok() ? e.output : nullptr);
      // Callback can be null if the rank sent Join request.
      if (e.callback != nullptr) {
        e.callback(status);
      }
    }
  }
}