void BackgroundThreadLoop()

in horovod/common/operations.cc [354:564]


void BackgroundThreadLoop(HorovodGlobalState& state) {
#if HAVE_CCL
  // Initialize ccl context
  if (state.cpu_operation == LibType::CCL) {
    ccl_context.Init();
  }
#endif

#if HAVE_MPI
  // Initialize mpi context
#if HAVE_DDL
  // If DDL is enabled, let DDL ops manage MPI environment.
  auto mpi_ctx_manager = DDL_MPIContextManager(ddl_context, gpu_context);
#else
  // Otherwise, let MPI ops be in charge.
  auto mpi_ctx_manager = MPIContextManager();
#endif
  mpi_context.Initialize(state.controller->GetRanks(), mpi_ctx_manager);
#endif

#if HAVE_GLOO
#if HAVE_MPI
    if (mpi_context.IsEnabled()) {
      // Initialize gloo context if mpi context is available
      gloo_context.InitializeFromMPI(mpi_context, ParseGlooIface());
    }
    else
#endif
    {
      gloo_context.Initialize(ParseGlooIface());
    }
#endif
  // Initialize controller
  state.controller->Initialize();

  bool is_coordinator = state.controller->IsCoordinator();
  bool is_homogeneous = state.controller->IsHomogeneous();
  int size = state.controller->GetSize();
  int local_size = state.controller->GetLocalSize();
  int local_rank = state.controller->GetLocalRank();

  // Set background thread affinity
  parse_and_set_affinity(std::getenv(HOROVOD_THREAD_AFFINITY), local_size, local_rank);

#if HAVE_GPU
  // Set number of GPU streams to use
  auto horovod_num_nccl_streams =
      std::getenv(HOROVOD_NUM_NCCL_STREAMS);
  if (horovod_num_nccl_streams != nullptr &&
      std::stol(horovod_num_nccl_streams, nullptr, 10) > 0) {
    state.num_nccl_streams = std::atoi(horovod_num_nccl_streams);
  }

#if HAVE_NCCL
  nccl_context.nccl_comms.resize(state.num_nccl_streams);
#endif
  gpu_context.streams.resize(state.num_nccl_streams);

  // Create finalizer thread pool (one thread per stream)
  gpu_context.finalizer_thread_pool.create(state.num_nccl_streams);
#endif

  // Open the timeline file on coordinator.
  auto horovod_timeline = std::getenv(HOROVOD_TIMELINE);
  if (is_coordinator && horovod_timeline != nullptr) {
    state.timeline.Initialize(std::string(horovod_timeline),
                              static_cast<unsigned int>(size));
  }
  if (horovod_timeline != nullptr) {
    state.controller->SetTimelineEnabled(true);
  }

  ParseStallInspectorFromEnv(state.controller->GetStallInspector());

  SetBoolFromEnv(HOROVOD_TIMELINE_MARK_CYCLES, state.mark_cycles_in_timeline,
                 true);

  // Override Tensor Fusion threshold, if it's set.
  state.parameter_manager.SetTensorFusionThresholdBytes(64 * 1024 * 1024);
  auto horovod_fusion_threshold = std::getenv(HOROVOD_FUSION_THRESHOLD);
  if (horovod_fusion_threshold != nullptr) {
    int64_t threshold = std::strtol(horovod_fusion_threshold, nullptr, 10);
    state.parameter_manager.SetTensorFusionThresholdBytes(threshold, true);
  }

  // Override the cycle time.
  state.parameter_manager.SetCycleTimeMs(5);
  auto horovod_cycle_time = std::getenv(HOROVOD_CYCLE_TIME);
  if (horovod_cycle_time != nullptr) {
    state.parameter_manager.SetCycleTimeMs(
        std::strtof(horovod_cycle_time, nullptr), true);
  }

  // Override response cache capacity, if it's set.
  state.parameter_manager.SetCacheEnabled(true);
  auto horovod_cache_capacity = std::getenv(HOROVOD_CACHE_CAPACITY);
  if (horovod_cache_capacity != nullptr) {
    uint32_t cache_capacity = std::strtol(horovod_cache_capacity, nullptr, 10);
    state.cache_capacity = cache_capacity;
    state.parameter_manager.SetCacheEnabled(cache_capacity > 0, true);
  }
  state.response_cache.set_capacity(
      (int)state.parameter_manager.CacheEnabled() * state.cache_capacity);

  // Set flag for hierarchical allgather. Ignore if Horovod is running on a
  // single node.
  auto horovod_hierarchical_allgather =
      std::getenv(HOROVOD_HIERARCHICAL_ALLGATHER);
  state.parameter_manager.SetHierarchicalAllgather(false);
  if (horovod_hierarchical_allgather != nullptr) {
    bool value = std::strtol(horovod_hierarchical_allgather, nullptr, 10) > 0 &&
                 (size != local_size);
    state.parameter_manager.SetHierarchicalAllgather(value, true);
  }
  // Set flag for hierarchical allreduce. Ignore if Horovod is running on a
  // single node.
  auto horovod_hierarchical_allreduce =
      std::getenv(HOROVOD_HIERARCHICAL_ALLREDUCE);
  state.parameter_manager.SetHierarchicalAllreduce(false);
  if (horovod_hierarchical_allreduce != nullptr) {
    bool value = std::strtol(horovod_hierarchical_allreduce, nullptr, 10) > 0 &&
                 (size != local_size);
    state.parameter_manager.SetHierarchicalAllreduce(value, true);
  }

#if HOROVOD_GPU_ALLREDUCE != 'N' && HOROVOD_GPU_ALLREDUCE != 'D'
  // Hierarchical allreduce is not supported without NCCL or DDL
  state.parameter_manager.SetHierarchicalAllreduce(false, true);
#endif

  // Issue warning if hierarchical allreduce is enabled in heterogeneous cluster
  if (is_coordinator &&
      (state.parameter_manager.HierarchicalAllreduce() ||
       state.parameter_manager.HierarchicalAllgather()) &&
      !is_homogeneous) {
    std::cerr
        << "WARNING: Using different number of ranks per node might cause "
           "performance loss in hierarchical allgather and "
           "hierarchical allreduce. Consider assigning the same "
           "number of ranks to each node, or disabling hierarchical "
           "allgather and hierarchical allreduce.";
  }

  // Enable auto-tuning.
  auto horovod_autotune = std::getenv(HOROVOD_AUTOTUNE);
  if (horovod_autotune != nullptr &&
      std::strtol(horovod_autotune, nullptr, 10) > 0) {
    auto horovod_autotune_log = std::getenv(HOROVOD_AUTOTUNE_LOG);
    state.parameter_manager.Initialize(state.controller->GetRank(), RANK_ZERO,
                                       horovod_autotune_log != nullptr
                                           ? std::string(horovod_autotune_log)
                                           : "");
    state.parameter_manager.SetAutoTuning(true);
  }

  // Set chunk size for MPI based Adasum allreduce algorithms
  auto horovod_adasum_mpi_chunk_size = std::getenv(HOROVOD_ADASUM_MPI_CHUNK_SIZE);
  if (horovod_adasum_mpi_chunk_size != nullptr) {
    state.adasum_mpi_chunk_size = std::strtol(horovod_adasum_mpi_chunk_size, nullptr, 10);
  }

  op_manager.reset(CreateOperationManager(state));

  // Signal that initialization is completed.
  state.initialization_done = true;
  LOG(INFO, horovod_global.controller->GetRank()) << "Horovod Initialized";

  // Iterate until shutdown.
  try {
    while (RunLoopOnce(state));
  } catch (const std::exception& ex) {
    LOG(ERROR) << "Horovod background loop uncaught exception: " << ex.what();
  }

    // Finalize all contexts
#if HAVE_NCCL
  nccl_context.ShutDown();
#endif

#if HAVE_GLOO
  gloo_context.Finalize();
#endif

  LOG(DEBUG, horovod_global.controller->GetRank()) << "Shutting down background thread";

  // Signal that shutdown has been requested.
  state.shut_down = true;

  // Notify all outstanding operations that Horovod has been shut down
  // and finalize tensor queue.
  std::vector<StatusCallback> callbacks;
  horovod_global.tensor_queue.FinalizeTensorQueue(callbacks);
  for (auto& cb : callbacks) {
    cb(SHUT_DOWN_ERROR);
  }

#if HAVE_GPU
  gpu_context.Finalize();
#endif

#if HAVE_MPI
  mpi_context.Finalize(mpi_ctx_manager);
#endif

#if HAVE_CCL
  if (state.cpu_operation == LibType::CCL){
    ccl_context.Finalize();
  }
#endif

}