in horovod/common/operations.cc [354:564]
void BackgroundThreadLoop(HorovodGlobalState& state) {
#if HAVE_CCL
// Initialize ccl context
if (state.cpu_operation == LibType::CCL) {
ccl_context.Init();
}
#endif
#if HAVE_MPI
// Initialize mpi context
#if HAVE_DDL
// If DDL is enabled, let DDL ops manage MPI environment.
auto mpi_ctx_manager = DDL_MPIContextManager(ddl_context, gpu_context);
#else
// Otherwise, let MPI ops be in charge.
auto mpi_ctx_manager = MPIContextManager();
#endif
mpi_context.Initialize(state.controller->GetRanks(), mpi_ctx_manager);
#endif
#if HAVE_GLOO
#if HAVE_MPI
if (mpi_context.IsEnabled()) {
// Initialize gloo context if mpi context is available
gloo_context.InitializeFromMPI(mpi_context, ParseGlooIface());
}
else
#endif
{
gloo_context.Initialize(ParseGlooIface());
}
#endif
// Initialize controller
state.controller->Initialize();
bool is_coordinator = state.controller->IsCoordinator();
bool is_homogeneous = state.controller->IsHomogeneous();
int size = state.controller->GetSize();
int local_size = state.controller->GetLocalSize();
int local_rank = state.controller->GetLocalRank();
// Set background thread affinity
parse_and_set_affinity(std::getenv(HOROVOD_THREAD_AFFINITY), local_size, local_rank);
#if HAVE_GPU
// Set number of GPU streams to use
auto horovod_num_nccl_streams =
std::getenv(HOROVOD_NUM_NCCL_STREAMS);
if (horovod_num_nccl_streams != nullptr &&
std::stol(horovod_num_nccl_streams, nullptr, 10) > 0) {
state.num_nccl_streams = std::atoi(horovod_num_nccl_streams);
}
#if HAVE_NCCL
nccl_context.nccl_comms.resize(state.num_nccl_streams);
#endif
gpu_context.streams.resize(state.num_nccl_streams);
// Create finalizer thread pool (one thread per stream)
gpu_context.finalizer_thread_pool.create(state.num_nccl_streams);
#endif
// Open the timeline file on coordinator.
auto horovod_timeline = std::getenv(HOROVOD_TIMELINE);
if (is_coordinator && horovod_timeline != nullptr) {
state.timeline.Initialize(std::string(horovod_timeline),
static_cast<unsigned int>(size));
}
if (horovod_timeline != nullptr) {
state.controller->SetTimelineEnabled(true);
}
ParseStallInspectorFromEnv(state.controller->GetStallInspector());
SetBoolFromEnv(HOROVOD_TIMELINE_MARK_CYCLES, state.mark_cycles_in_timeline,
true);
// Override Tensor Fusion threshold, if it's set.
state.parameter_manager.SetTensorFusionThresholdBytes(64 * 1024 * 1024);
auto horovod_fusion_threshold = std::getenv(HOROVOD_FUSION_THRESHOLD);
if (horovod_fusion_threshold != nullptr) {
int64_t threshold = std::strtol(horovod_fusion_threshold, nullptr, 10);
state.parameter_manager.SetTensorFusionThresholdBytes(threshold, true);
}
// Override the cycle time.
state.parameter_manager.SetCycleTimeMs(5);
auto horovod_cycle_time = std::getenv(HOROVOD_CYCLE_TIME);
if (horovod_cycle_time != nullptr) {
state.parameter_manager.SetCycleTimeMs(
std::strtof(horovod_cycle_time, nullptr), true);
}
// Override response cache capacity, if it's set.
state.parameter_manager.SetCacheEnabled(true);
auto horovod_cache_capacity = std::getenv(HOROVOD_CACHE_CAPACITY);
if (horovod_cache_capacity != nullptr) {
uint32_t cache_capacity = std::strtol(horovod_cache_capacity, nullptr, 10);
state.cache_capacity = cache_capacity;
state.parameter_manager.SetCacheEnabled(cache_capacity > 0, true);
}
state.response_cache.set_capacity(
(int)state.parameter_manager.CacheEnabled() * state.cache_capacity);
// Set flag for hierarchical allgather. Ignore if Horovod is running on a
// single node.
auto horovod_hierarchical_allgather =
std::getenv(HOROVOD_HIERARCHICAL_ALLGATHER);
state.parameter_manager.SetHierarchicalAllgather(false);
if (horovod_hierarchical_allgather != nullptr) {
bool value = std::strtol(horovod_hierarchical_allgather, nullptr, 10) > 0 &&
(size != local_size);
state.parameter_manager.SetHierarchicalAllgather(value, true);
}
// Set flag for hierarchical allreduce. Ignore if Horovod is running on a
// single node.
auto horovod_hierarchical_allreduce =
std::getenv(HOROVOD_HIERARCHICAL_ALLREDUCE);
state.parameter_manager.SetHierarchicalAllreduce(false);
if (horovod_hierarchical_allreduce != nullptr) {
bool value = std::strtol(horovod_hierarchical_allreduce, nullptr, 10) > 0 &&
(size != local_size);
state.parameter_manager.SetHierarchicalAllreduce(value, true);
}
#if HOROVOD_GPU_ALLREDUCE != 'N' && HOROVOD_GPU_ALLREDUCE != 'D'
// Hierarchical allreduce is not supported without NCCL or DDL
state.parameter_manager.SetHierarchicalAllreduce(false, true);
#endif
// Issue warning if hierarchical allreduce is enabled in heterogeneous cluster
if (is_coordinator &&
(state.parameter_manager.HierarchicalAllreduce() ||
state.parameter_manager.HierarchicalAllgather()) &&
!is_homogeneous) {
std::cerr
<< "WARNING: Using different number of ranks per node might cause "
"performance loss in hierarchical allgather and "
"hierarchical allreduce. Consider assigning the same "
"number of ranks to each node, or disabling hierarchical "
"allgather and hierarchical allreduce.";
}
// Enable auto-tuning.
auto horovod_autotune = std::getenv(HOROVOD_AUTOTUNE);
if (horovod_autotune != nullptr &&
std::strtol(horovod_autotune, nullptr, 10) > 0) {
auto horovod_autotune_log = std::getenv(HOROVOD_AUTOTUNE_LOG);
state.parameter_manager.Initialize(state.controller->GetRank(), RANK_ZERO,
horovod_autotune_log != nullptr
? std::string(horovod_autotune_log)
: "");
state.parameter_manager.SetAutoTuning(true);
}
// Set chunk size for MPI based Adasum allreduce algorithms
auto horovod_adasum_mpi_chunk_size = std::getenv(HOROVOD_ADASUM_MPI_CHUNK_SIZE);
if (horovod_adasum_mpi_chunk_size != nullptr) {
state.adasum_mpi_chunk_size = std::strtol(horovod_adasum_mpi_chunk_size, nullptr, 10);
}
op_manager.reset(CreateOperationManager(state));
// Signal that initialization is completed.
state.initialization_done = true;
LOG(INFO, horovod_global.controller->GetRank()) << "Horovod Initialized";
// Iterate until shutdown.
try {
while (RunLoopOnce(state));
} catch (const std::exception& ex) {
LOG(ERROR) << "Horovod background loop uncaught exception: " << ex.what();
}
// Finalize all contexts
#if HAVE_NCCL
nccl_context.ShutDown();
#endif
#if HAVE_GLOO
gloo_context.Finalize();
#endif
LOG(DEBUG, horovod_global.controller->GetRank()) << "Shutting down background thread";
// Signal that shutdown has been requested.
state.shut_down = true;
// Notify all outstanding operations that Horovod has been shut down
// and finalize tensor queue.
std::vector<StatusCallback> callbacks;
horovod_global.tensor_queue.FinalizeTensorQueue(callbacks);
for (auto& cb : callbacks) {
cb(SHUT_DOWN_ERROR);
}
#if HAVE_GPU
gpu_context.Finalize();
#endif
#if HAVE_MPI
mpi_context.Finalize(mpi_ctx_manager);
#endif
#if HAVE_CCL
if (state.cpu_operation == LibType::CCL){
ccl_context.Finalize();
}
#endif
}