in tensorflow_federated/cc/core/impl/executors/composing_executor.cc [584:665]
absl::StatusOr<ExecutorValue> CallIntrinsicAggregate(
ExecutorValue&& arg) const {
TFF_TRY(arg.CheckLenForUseAsArgument("federated_aggregate", 5));
const auto& value = arg.structure()->at(0);
if (value.type() != ExecutorValue::ValueType::CLIENTS) {
return absl::InvalidArgumentError(
"Cannot aggregate a value not placed at clients");
}
const auto& zero = arg.structure()->at(1);
v0::Value zero_val;
ParallelTasks tasks;
TFF_TRY(MaterializeValue(zero, &zero_val, tasks));
TFF_TRY(tasks.WaitAll());
const auto& accumulate = arg.structure()->at(2);
auto accumulate_val =
TFF_TRY(accumulate.GetUnplacedFunctionProto("accumulate"));
const auto& merge = arg.structure()->at(3);
auto merge_val = TFF_TRY(merge.GetUnplacedFunctionProto("merge"));
auto merge_id = TFF_TRY(merge.unplaced()->Embedded(*server_));
const auto& report = arg.structure()->at(4);
auto report_val = TFF_TRY(report.GetUnplacedFunctionProto("report"));
auto report_id = TFF_TRY(report.unplaced()->Embedded(*server_));
v0::Value null_report_val;
*null_report_val.mutable_computation() = IdentityComp();
v0::Value aggregate;
aggregate.mutable_computation()->mutable_intrinsic()->mutable_uri()->assign(
kFederatedAggregateUri.data(), kFederatedAggregateUri.size());
// Initiate the aggregation in each child.
std::vector<OwnedValueId> child_result_ids;
child_result_ids.reserve(children_.size());
for (uint32_t i = 0; i < children_.size(); i++) {
const auto& child = children_[i].executor();
ValueId child_val = value.clients()->at(i)->ref();
std::vector<OwnedValueId> arg_owners;
std::vector<ValueId> arg_ids;
arg_ids.emplace_back(child_val);
for (const v0::Value& arg_value :
{zero_val, *accumulate_val, *merge_val, null_report_val}) {
OwnedValueId child_id = TFF_TRY(child->CreateValue(arg_value));
arg_ids.emplace_back(child_id.ref());
arg_owners.emplace_back(std::move(child_id));
}
auto child_arg_id = TFF_TRY(child->CreateStruct(std::move(arg_ids)));
auto child_aggregate_id = TFF_TRY(child->CreateValue(aggregate));
auto child_result_id =
TFF_TRY(child->CreateCall(child_aggregate_id, child_arg_id));
child_result_ids.push_back(std::move(child_result_id));
}
// Materialize and merge the results from each child executor.
// TODO(b/192457028): parallelize this so that we're materializing more than
// one value at a time and so that we can begin merging as soon as any
// result is available.
absl::optional<OwnedValueId> current = absl::nullopt;
for (uint32_t i = 0; i < children_.size(); i++) {
const auto& child = children_[i].executor();
v0::Value child_result = TFF_TRY(child->Materialize(child_result_ids[i]));
if (!child_result.has_federated() ||
child_result.federated().type().placement().value().uri() !=
kServerUri) {
return absl::InternalError(
"Child executor returned non-server-placed value");
}
auto child_result_server_id =
TFF_TRY(server_->CreateValue(child_result.federated().value(0)));
if (current.has_value()) {
auto merge_arg = TFF_TRY(
server_->CreateStruct({current.value(), child_result_server_id}));
current = TFF_TRY(server_->CreateCall(merge_id->ref(), merge_arg));
} else {
current = std::move(child_result_server_id);
}
}
auto result =
TFF_TRY(server_->CreateCall(report_id->ref(), current.value()));
return ExecutorValue::CreateServerPlaced(ShareValueId(std::move(result)));
}