horovod/common/ops/gloo_operations.cc (255 lines of code) (raw):
// Copyright 2019 Uber Technologies, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================
#include "gloo_operations.h"
#include "gloo/allgather.h"
#include "gloo/allgatherv.h"
#include "gloo/allreduce.h"
#include "gloo/alltoallv.h"
#include "gloo/broadcast.h"
#include "gloo/math.h"
#include "gloo/types.h"
#include "../common.h"
#include "../global_state.h"
namespace horovod {
namespace common {
IGlooAlgorithms* GetAlgorithmsForType(DataType dtype,
GlooContext* gloo_context) {
switch (dtype) {
case HOROVOD_UINT8:
return new GlooAlgorithms<u_int8_t>(gloo_context);
case HOROVOD_INT8:
return new GlooAlgorithms<int8_t>(gloo_context);
case HOROVOD_UINT16:
return new GlooAlgorithms<u_int16_t>(gloo_context);
case HOROVOD_INT16:
return new GlooAlgorithms<int16_t>(gloo_context);
case HOROVOD_INT32:
return new GlooAlgorithms<int32_t>(gloo_context);
case HOROVOD_INT64:
return new GlooAlgorithms<int64_t>(gloo_context);
case HOROVOD_FLOAT16:
return new GlooAlgorithms<gloo::float16>(gloo_context);
case HOROVOD_FLOAT32:
return new GlooAlgorithms<float>(gloo_context);
case HOROVOD_FLOAT64:
return new GlooAlgorithms<double>(gloo_context);
case HOROVOD_BOOL:
return new GlooAlgorithms<bool>(gloo_context);
default:
throw std::logic_error("Type " + DataType_Name(dtype) +
" is not supported in Gloo mode.");
}
}
template <typename T>
GlooAlgorithms<T>::GlooAlgorithms(GlooContext* gloo_context)
: gloo_context_(gloo_context) {}
template <typename T>
void GlooAlgorithms<T>::Allreduce(void* buffer_data, int num_elements) {
gloo::AllreduceOptions opts(gloo_context_->ctx);
opts.setOutput<T>(static_cast<T*>(buffer_data), (size_t) num_elements);
void (*func)(void*, const void*, const void*, size_t) = &::gloo::sum<T>;
opts.setReduceFunction(gloo::AllreduceOptions::Func(func));
gloo::allreduce(opts);
}
template <typename T>
void GlooAlgorithms<T>::Allgather(void* buffer_data, void* buffer_out,
int* recvcounts, int* displcmnts) {
// create count index
std::vector<size_t> counts(recvcounts, recvcounts + gloo_context_->ctx->size);
gloo::AllgathervOptions opts(gloo_context_->ctx);
opts.setInput<T>(static_cast<T*>(buffer_data) +
displcmnts[gloo_context_->ctx->rank],
counts[gloo_context_->ctx->rank]);
opts.setOutput<T>(static_cast<T*>(buffer_out), counts);
gloo::allgatherv(opts);
}
template <typename T>
void GlooAlgorithms<T>::Broadcast(void* buffer_data, int num_elements,
int root_rank) {
gloo::BroadcastOptions opts(gloo_context_->ctx);
opts.setRoot(root_rank);
opts.setOutput<T>(static_cast<T*>(buffer_data), (size_t) num_elements);
gloo::broadcast(opts);
}
template <typename T>
void GlooAlgorithms<T>::Alltoall(void* buffer_data, void* buffer_out,
std::vector<int64_t>& sendcounts,
std::vector<int64_t>& recvcounts) {
gloo::AlltoallvOptions opts(gloo_context_->ctx);
opts.setInput<T>(static_cast<T*>(buffer_data), sendcounts);
opts.setOutput<T>(static_cast<T*>(buffer_out), recvcounts);
gloo::alltoallv(opts);
}
template <typename T> int GlooAlgorithms<T>::ElementSize() const {
return sizeof(T);
}
GlooAllreduce::GlooAllreduce(GlooContext* gloo_context,
HorovodGlobalState* global_state)
: AllreduceOp(global_state), gloo_context_(gloo_context) {}
Status GlooAllreduce::Execute(std::vector<TensorTableEntry>& entries,
const Response& response) {
auto& first_entry = entries[0];
const void* fused_input_data;
void* buffer_data;
int num_elements = (int)NumElements(entries);
// Copy memory into the fusion buffer.
auto& timeline = global_state_->timeline;
if (entries.size() > 1) {
timeline.ActivityStartAll(entries, MEMCPY_IN_FUSION_BUFFER);
size_t buffer_len;
MemcpyInFusionBuffer(entries, fused_input_data, buffer_data, buffer_len);
timeline.ActivityEndAll(entries);
} else {
buffer_data = (void*)first_entry.output->data();
std::memcpy(buffer_data, first_entry.tensor->data(),
(size_t)first_entry.tensor->size());
fused_input_data = buffer_data;
}
if (response.prescale_factor() != 1.0) {
// Execute prescaling op
ScaleBuffer(response.prescale_factor(), entries, fused_input_data, buffer_data, num_elements);
}
// Do allreduce.
timeline.ActivityStartAll(entries, GLOO_ALLREDUCE);
std::unique_ptr<IGlooAlgorithms> gloo_algos(
GetAlgorithmsForType(first_entry.tensor->dtype(), gloo_context_));
gloo_algos->Allreduce(buffer_data, num_elements);
timeline.ActivityEndAll(entries);
if (response.postscale_factor() != 1.0) {
// Execute postscaling op
ScaleBuffer(response.postscale_factor(), entries, buffer_data, buffer_data, num_elements);
}
// Copy memory out of the fusion buffer.
if (entries.size() > 1) {
timeline.ActivityStartAll(entries, MEMCPY_OUT_FUSION_BUFFER);
MemcpyOutFusionBuffer(buffer_data, entries);
timeline.ActivityEndAll(entries);
}
return Status::OK();
}
bool GlooAllreduce::Enabled(const ParameterManager& param_manager,
const std::vector<TensorTableEntry>& entries,
const Response& response) const {
return true;
}
GlooAllgather::GlooAllgather(GlooContext* gloo_context,
HorovodGlobalState* global_state)
: AllgatherOp(global_state), gloo_context_(gloo_context) {}
bool GlooAllgather::Enabled(const ParameterManager& param_manager,
const std::vector<TensorTableEntry>& entries,
const Response& response) const {
return true;
}
Status GlooAllgather::Execute(std::vector<TensorTableEntry>& entries,
const Response& response) {
auto& timeline = global_state_->timeline;
// Sizes of subcomponents of each entry from all ranks
auto** entry_component_sizes = new int64_t*[entries.size()];
// Offset of each subcomponent of every entry in the final buffer after
// allgatherv
auto** entry_component_offsets = new int64_t*[entries.size()];
int global_size = global_state_->controller->GetSize();
auto* recvcounts = new int[global_size]();
auto* displcmnts = new int[global_size]();
for (size_t ec = 0; ec < entries.size(); ++ec) {
entry_component_sizes[ec] = new int64_t[global_size]();
entry_component_offsets[ec] = new int64_t[global_size]();
}
auto& first_entry = entries[0];
timeline.ActivityStartAll(entries, ALLOCATE_OUTPUT);
Status status =
AllocateOutput(entries, response, entry_component_sizes, recvcounts);
if (!status.ok()) {
/* Cleanup */
for (size_t ec = 0; ec < entries.size(); ++ec) {
delete[] entry_component_sizes[ec];
delete[] entry_component_offsets[ec];
}
delete[] entry_component_sizes;
delete[] entry_component_offsets;
delete[] recvcounts;
delete[] displcmnts;
return status;
}
timeline.ActivityEndAll(entries);
SetDisplacements(recvcounts, displcmnts);
SetEntryComponentOffsets(entries, entry_component_sizes, recvcounts,
entry_component_offsets);
std::unique_ptr<IGlooAlgorithms> gloo_algos(
GetAlgorithmsForType(first_entry.tensor->dtype(), gloo_context_));
int element_size = gloo_algos->ElementSize();
void* sendbuf = nullptr;
void* buffer_data;
if (entries.size() > 1) {
timeline.ActivityStartAll(entries, MEMCPY_IN_FUSION_BUFFER);
MemcpyInFusionBuffer(entries, displcmnts, element_size, buffer_data);
sendbuf = buffer_data;
timeline.ActivityEndAll(entries);
} else {
// need to move input data to its corresponding location in the output
sendbuf = (void*)first_entry.tensor->data();
buffer_data = (void*)first_entry.output->data();
int buffer_offset = displcmnts[gloo_context_->ctx->rank] * element_size;
std::memcpy((uint8_t*)buffer_data + buffer_offset, sendbuf,
(size_t)first_entry.tensor->size());
sendbuf = buffer_data;
}
// call gloo allgather api
global_state_->timeline.ActivityStartAll(entries, GLOO_ALLGATHER);
gloo_algos->Allgather(sendbuf, buffer_data, recvcounts, displcmnts);
global_state_->timeline.ActivityEndAll(entries);
// if multiple tensors are gathered, restore the sequence from output
if (entries.size() > 1) {
timeline.ActivityStartAll(entries, MEMCPY_OUT_FUSION_BUFFER);
MemcpyOutFusionBuffer(entry_component_offsets, entry_component_sizes,
buffer_data, element_size, entries);
timeline.ActivityEndAll(entries);
}
delete[] recvcounts;
delete[] displcmnts;
for (size_t ec = 0; ec < entries.size(); ++ec) {
delete[] entry_component_sizes[ec];
delete[] entry_component_offsets[ec];
}
delete[] entry_component_sizes;
delete[] entry_component_offsets;
return Status::OK();
}
GlooBroadcast::GlooBroadcast(GlooContext* gloo_context,
HorovodGlobalState* global_state)
: BroadcastOp(global_state), gloo_context_(gloo_context) {}
Status GlooBroadcast::Execute(std::vector<TensorTableEntry>& entries,
const Response& response) {
assert(entries.size() == 1);
auto e = entries[0];
// On root rank, MPI_Bcast sends data, on other ranks it receives data.
// for gloo broadcast, only output needs to be set if inplace
void* data_ptr;
if (global_state_->controller->GetRank() == e.root_rank) {
data_ptr = (void*)e.tensor->data();
} else {
data_ptr = (void*)e.output->data();
}
global_state_->timeline.ActivityStartAll(entries, GLOO_BCAST);
std::unique_ptr<IGlooAlgorithms> gloo_algos(
GetAlgorithmsForType(e.tensor->dtype(), gloo_context_));
gloo_algos->Broadcast(data_ptr, (int)e.tensor->shape().num_elements(),
e.root_rank);
global_state_->timeline.ActivityEndAll(entries);
return Status::OK();
}
bool GlooBroadcast::Enabled(const ParameterManager& param_manager,
const std::vector<TensorTableEntry>& entries,
const Response& response) const {
return true;
}
GlooAlltoall::GlooAlltoall(GlooContext* gloo_context,
HorovodGlobalState* global_state)
: AlltoallOp(global_state), gloo_context_(gloo_context) {}
Status GlooAlltoall::Execute(std::vector<TensorTableEntry>& entries, const Response& response) {
assert(entries.size() == 1);
auto e = entries[0];
std::vector<int64_t> sdispls, rdispls;
std::vector<int64_t> sendcounts, recvcounts;
Status status = PrepareOutputAndParams(e, sdispls, rdispls, sendcounts, recvcounts);
if (!status.ok()) {
return status;
}
global_state_->timeline.ActivityStartAll(entries, MPI_ALLTOALL);
std::unique_ptr<IGlooAlgorithms> gloo_algos(
GetAlgorithmsForType(e.tensor->dtype(), gloo_context_));
gloo_algos->Alltoall((void*)e.tensor->data(), (void*)e.output->data(),
sendcounts, recvcounts);
global_state_->timeline.ActivityEndAll(entries);
return Status::OK();
}
bool GlooAlltoall::Enabled(const ParameterManager& param_manager,
const std::vector<TensorTableEntry>& entries,
const Response& response) const {
return true;
}
} // namespace common
} // namespace horovod