in tensorflow_federated/cc/core/impl/executors/federating_executor.cc [365:475]
absl::StatusOr<ExecutorValue> CallFederatedIntrinsic(
FederatedIntrinsic function, ExecutorValue arg) {
switch (function) {
case FederatedIntrinsic::VALUE_AT_CLIENTS: {
return ClientsAllEqualValue(TFF_TRY(Embed(arg)));
}
case FederatedIntrinsic::VALUE_AT_SERVER: {
return ExecutorValue::CreateServerPlaced(TFF_TRY(Embed(arg)));
}
case FederatedIntrinsic::EVAL_AT_SERVER: {
auto embedded = TFF_TRY(Embed(arg));
return ExecutorValue::CreateServerPlaced(ShareValueId(
TFF_TRY(child_->CreateCall(embedded->ref(), absl::nullopt))));
}
case FederatedIntrinsic::EVAL_AT_CLIENTS: {
auto embedded = TFF_TRY(Embed(arg));
Clients client_values = NewClients();
for (int i = 0; i < num_clients_; i++) {
client_values->emplace_back(ShareValueId(
TFF_TRY(child_->CreateCall(embedded->ref(), absl::nullopt))));
}
return ExecutorValue::CreateClientsPlaced(std::move(client_values));
}
case FederatedIntrinsic::AGGREGATE: {
TFF_TRY(CheckLenForUseAsArgument(arg, "federated_aggregate", 5));
const auto& value = arg.structure()->at(0);
const auto& zero = arg.structure()->at(1);
auto zero_child_id = TFF_TRY(Embed(zero));
const auto& accumulate = arg.structure()->at(2);
auto accumulate_child_id = TFF_TRY(Embed(accumulate));
// `merge` is unused (argument four).
const auto& report = arg.structure()->at(4);
auto report_child_id = TFF_TRY(Embed(report));
TFF_TRY(value.CheckArgumentType(ExecutorValue::ValueType::CLIENTS,
"`federated_aggregate`'s `value`"));
absl::optional<OwnedValueId> current_owner = absl::nullopt;
ValueId current = zero_child_id->ref();
for (const auto& client_val_id : *value.clients()) {
auto acc_arg =
TFF_TRY(child_->CreateStruct({current, client_val_id->ref()}));
current_owner =
TFF_TRY(child_->CreateCall(accumulate_child_id->ref(), acc_arg));
current = current_owner.value().ref();
}
auto result =
TFF_TRY(child_->CreateCall(report_child_id->ref(), current));
return ExecutorValue::CreateServerPlaced(
ShareValueId(std::move(result)));
}
case FederatedIntrinsic::BROADCAST: {
TFF_TRY(arg.CheckArgumentType(ExecutorValue::ValueType::SERVER,
"`federated_broadcast`"));
return ClientsAllEqualValue(arg.server());
}
case FederatedIntrinsic::MAP: {
TFF_TRY(CheckLenForUseAsArgument(arg, "federated_map", 2));
const auto& fn = arg.structure()->at(0);
auto child_fn = TFF_TRY(Embed(fn));
ValueId child_fn_ref = child_fn->ref();
const auto& data = arg.structure()->at(1);
if (data.type() == ExecutorValue::ValueType::CLIENTS) {
Clients results = NewClients();
for (int i = 0; i < num_clients_; i++) {
auto client_arg = data.clients()->at(i)->ref();
auto result = TFF_TRY(child_->CreateCall(child_fn_ref, client_arg));
results->emplace_back(ShareValueId(std::move(result)));
}
return ExecutorValue::CreateClientsPlaced(std::move(results));
} else if (data.type() == ExecutorValue::ValueType::SERVER) {
auto res =
TFF_TRY(child_->CreateCall(child_fn_ref, data.server()->ref()));
return ExecutorValue::CreateServerPlaced(
ShareValueId(std::move(res)));
} else {
return absl::InvalidArgumentError(
"Attempted to map non-federated value.");
}
}
case FederatedIntrinsic::SELECT: {
TFF_TRY(CheckLenForUseAsArgument(arg, "federated_select", 4));
const auto& keys = arg.structure()->at(0);
// Argument two (`max_key`) is unused in this impl.
const auto& server_val = arg.structure()->at(2);
const auto& select_fn = arg.structure()->at(3);
TFF_TRY(keys.CheckArgumentType(ExecutorValue::ValueType::CLIENTS,
"`federated_select`'s `keys`"));
const Clients& keys_child_ids = keys.clients();
TFF_TRY(
server_val.CheckArgumentType(ExecutorValue::ValueType::SERVER,
"`federated_select`'s `server_val`"));
ValueId server_val_child_id = server_val.server()->ref();
TFF_TRY(
select_fn.CheckArgumentType(ExecutorValue::ValueType::UNPLACED,
"`federated_select`'s `select_fn`"));
ValueId select_fn_child_id = select_fn.unplaced()->ref();
return CallFederatedSelect(keys_child_ids, server_val_child_id,
select_fn_child_id);
}
case FederatedIntrinsic::ZIP_AT_CLIENTS: {
Clients results = NewClients();
for (uint32_t i = 0; i < num_clients_; i++) {
results->push_back(TFF_TRY(ZipStructIntoClient(arg, i)));
}
return ExecutorValue::CreateClientsPlaced(std::move(results));
}
case FederatedIntrinsic::ZIP_AT_SERVER: {
return ExecutorValue::CreateServerPlaced(
TFF_TRY(ZipStructIntoServer(arg)));
}
}
}